Tutoriale / Pierwsza kolejka komunikatów

668153da-01e7-c6d6-e3d5-b0170820ba13

Producent i konsument komunikatów

Skoro mamy już wszystko przygotowane (działające środowisko PHP), pora na odrobinę teorii na temat działania RabbitMQ.

Na samym początku przygody z pośrednikiem komunikatów RabbitMQ przyjmiemy, że nadawcę komunikatów nazywamy producentem, a odbiorcę komunikatów konsumentem. Zaś serwer RabbitMQ po prostu kolejką.

Aby lepiej zilustrować zależności pomiędzy tymi obiektami, posłużmy się grafikami.

Producenta (aplikację nadawcy) oznaczmy jako:

Aplikacja nadawcy

Kolejkę serwera RabbitMQ jako:

Kolejka komunikatów

Zaś konsumenta (aplikację odbiorcy) jako:

Aplikacja odbiorcy

Cały diagram prezentujący drogę wiadomości od nadawcy do odbiorcy wygląda następująco:

Diagram: producent - kolejka komunikatów - konsument

Kolejka może przechowywać dowolną liczbę komunikatów – ogranicza ją tylko rozmiar pamięci RAM i miejsce na dysku twardym, na którym zainstalowany jest pośrednik komunikatów.

Pierwsza para programów

Nadszedł czas by napisać nasz pierwszy, prawdziwy program służący do wysyłania komunikatów do kolejki RabbitMQ.

W zasadzie do napisania mamy dwa programy:

  • pierwszy będący nadawcą-producentem, który wysyła dane do naszego kuriera komunikatów (nadaj.php)
  • oraz drugi - będący odbiorcą-konsumentem, a więc będzie odbierał dane z kolejki.

Program nadaj.php

W pliku nadaj.php potrzebujemy na początku zaimportować dwa pliki, z których będziemy korzystać we wszystkich programach napisanych w ramach tej serii tutoriali.

Po pierwsze będzie to załadowanie zewnętrznych bibliotek za pomocą funkcji autoload programu Composer:

require_once __DIR__ . '/vendor/autoload.php';

Następnie importujemy drugi plik, tym razem będzie to obsługa zakończania pracy naszych skryptów PHP, które działałyby non-stop:

require_once __DIR__ . '/shutdown.php';

Ten drugi plik (shutdown.php) ma następującą zawartość:

/**
 * Executes when application close
 *
 * @param \PhpAmqpLib\Channel\AMQPChannel $channel
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();

    echo PHP_EOL, 'Aktywne połączenia zostały pomyślnie zakończone.', PHP_EOL;
}

W pliku znajduje się ciało funkcji shutdown(), która ma pozwolić na zamknięcie połączenia z serwerem RabbitMQ, a nie jego zerwanie.

Gdy powrócimy do pliku nadaj.php w kolejnych liniach widzimy deklarację użycia dwóch klas z biblioteki PhpAmqpLib:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

Następnie inicjujemy połączenie z kurierem wiadomości:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

Kolejnym etapem w naszym programie jest przypisanie do zmiennej $channel kanału połączenia AMQP.

Aby wysłać komunikat do serwera RabbitMQ, potrzebujemy zdefiniować kolejkę i jej parametry.

W naszym przykładzie kolejka będzie nazywała się hello i będzie nietrwała, o czym świadczy parametr durable ustawiony na false:

$queue = 'hello';

$channel->queue_declare($queue = 'hello', false, false, false, false);

Użyta funckja queue_declare() ma następującą postać:

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
public function queue_declare(
    $queue = '',
    $passive = false,
    $durable = false,
    $exclusive = false,
    $auto_delete = true,
    $nowait = false,
    $arguments = array(),
    $ticket = null
) {
    // instrukcje
}

Szerzej na temat trwałości kolejki omówimy przy drugiej parze programów w tej serii tutoriali.

Deklaracja kolejki inicjuje ją tylko w przypadku, gdy nie została ona wcześniej utworzona. Jeśli kolejka istnieje, to powtórna deklaracja jej w naszym programie nie zmieni jej stanu (ang. idempotent) i nie spowoduje zgłoszenia błędu.

Kompletny kod programu nadaj.php prezentuje się następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$queue = 'hello';

$channel->queue_declare($queue = 'hello', false, false, false, false);

$text = 'Witamy!';

$message = new AMQPMessage($text);

$channel->basic_publish($message, '', $queue);

echo " [x] Wysłano '$text'\n";


register_shutdown_function('shutdown', $channel, $connection);

Stan kolejki po wysłaniu komunikatu możemy sprawdzić w panelu administracyjnym:

Stan kolejki hello po wysłaniu pojedynczego komunikatu

Ilustruje on sytuację, gdy wysłano 1 komunikat i nie posiada on jeszcze odbiorcy (żaden program nie zgłosił się po niego), tak więc ten jeden komunikat czeka w kolejce na odbiór.

Program odbierz.php

Na początku również definiujemy kolejkę o nazwie hello. Robimy to ponieważ program konsumenta komunikatów może być uruchomiony przed programem wysyłającym dane, co ma znaczenie, gdy tak skonfigurowana kolejka nie istnieje jeszcze w systemie RabbitMQ.

Kolejnym elementem jest zainicjowanie reagowania na pojawiające się w kolejce komunikaty:

$channel->basic_consume(
    $queue,
    '',
    false,
    true,
    false,
    false,
    function($message) {
        // instrukcje konsumowania komunikatu
    }
);

Gdy takowe się pojawią nasz program automatycznie je pobierze z pośrednika komunikatów, zmniejszając tym samym liczbę pakietów oczekujących w kolejce.

Zatem za każdym razem, gdy serwer prześle do naszego odbiorcy komunikat przechowywany w kolejce, wywołany zostanie następujący kod, który drukuje otrzymaną treść komunikatu w terminalu:

echo " [x] Odebrano ", $message->body, "\n";

Rezultat uruchomienia programu odbierz.php jest następujący:

$ php odbierz.php
 [*] Oczekiwanie na wiadomości w hello.
     Naciśnij CTRL+C aby zakończyć.
 [x] Odebrano Witamy!

Efekt działania tego programu możemy zobaczyć również w panelu administracyjnym kolejki hello:

Stan kolejki hello po wysłaniu pojedynczego komunikatu

Jak widać, komunikat przechowywany w kolejce znalazł odbiorcę, dzięki czemu kolejka jest teraz pusta.

Oto pełny kod programu odbierz.php:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$queue = 'hello';

$channel->queue_declare($queue, false, false, false, false);

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queue), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume(
    $queue,
    '',
    false,
    true,
    false,
    false,
    function($message) {
        echo " [x] Odebrano ", $message->body, "\n";
    }
);

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Wymaga w nim wyjaśnienia jedna rzecz: otóż metodę $channel->basic_consume(), wykonywaną na obiekcie połączenia AMQP, wywołujemy z parametrem $no_ack ustawionym na true.

/**
 * Starts a queue consumer
 *
 * @param string $queue
 * @param string $consumer_tag
 * @param bool $no_local
 * @param bool $no_ack
 * @param bool $exclusive
 * @param bool $nowait
 * @param callable|null $callback
 * @param int|null $ticket
 * @param array $arguments
 * @return mixed|string
 */
public function basic_consume(
    $queue = '',
    $consumer_tag = '',
    $no_local = false,
    $no_ack = false,
    $exclusive = false,
    $nowait = false,
    $callback = null,
    $ticket = null,
    $arguments = array()
) {
    // instrukcje
}

Oznacza to, że odbiorca komunikatu nie potwierdza (ang. no acknowledgement), iż komunikat odczytał. Innymi słowy, serwer RabbitMQ przesyła do konsumenta komunikat, nie sprawdzając czy dotarła ona prawidłowo, czy może została utracona, bo w trakcie jej przetwarzania program odbiorcy zakończył działanie i nie zdążył skończyć wykonywania jej obróbki (np. zapisać w bazie danych).

Jest to sytuacja jak w tradycyjnej poczcie - list został zabrany z placówki pocztowej, ale nie wiadomo, czy dotarł do odbiorcy.

W przypadku poczty - rozwiązaniem takiej sytuacji jest wysyłanie za potwierdzeniem odbioru.

Jak widać, analogii systemu RabbitMQ do zasad doręczania przesyłek za pomocą tradycyjnej poczty czy kuriera jest więcej. Do tego tematu powrócimy podczas pisania drugiej pary przykładowych programów.

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.