Tutoriale / Wydawca komunikatów i subskrybenci

f60a45e2-ace4-319a-e430-3c2559d96aab

Do tej pory (jeśli podążasz za kolejnymi tutorialami) mieliśmy do czynienia tylko z kolejkami. Główną zasadą ich działania było to, że pojedynczy komunikat był przekazywany wyłącznie do jednego odbiorcy, co można porównać do przesyłania listów od nadawcy do odbiorcy.

W ten sposób można było lepiej rozłożyć przetwarzanie zadań w przypadku, gdy dysponowaliśmy większą liczbą robotników.

Teraz zajmiemy się czymś odrobinę bardziej złożonym, a mianowicie dostarczaniem tego samego komunikatu do wielu subskrybentów, czyli czymś w rodzaju dystrybucji prasy czy książek.

Producenta komunikatów będziemy nazywać wydawcą (ang. publisher), zaś konsumentów (odbiorców) komunikatów nazywać będziemy subskrybentami (ang. subscribers).

Centrala komunikatów

Komunikat jest przekazywany przez wydawcę:
Obrazek przedstawiający wydawcę komunikatów
do centrali komunikatów (ang. exchange):
Obrazek przedstawiający centralę komunikatów

Ta dodatkowa warstwa zwana jest w terminologii RabbitMQ pod angielskim słowem exchange, a w wolnym tłumaczeniu tablicą routing'u o określonej nazwie lub centralą komunikatów, nawiązującej pełnioną funkcją do centrali telefonicznych (ang. telephone exchange).

Podstawową cechą centrali komunikatów jest filtrowanie i/lub dyspozycja komunikatów na bazie określonych reguł.

Można to zrobić na kilka sposobów co zostanie pokazane w kolejnych częściach tutoriali.

Poniższy diagram ilustruje pełny przepływ komunikatów przetwarzanych przez RabbitMQ w centrali rozgłośni komunikatów (typu fanout):

Diagram przedstawiający centralę rozgłośni komunikatów

Główną różnicą w porównaniu do schematu omawianego wcześniej jest fakt, iż komunikat może zostać utracony, gdy nie ma żadnego subskrybenta, zatem komunikat nie jest odkładany w żadnej kolejce dla nieistniejącego odbiorcy.

Dzieje się tak, gdyż dla każdego odbiorcy tworzone są tymczasowe kolejki.

W ramach tego tutoriala napiszemy dwa programy:

  • wydawca.php – publikujący nowe wydanie magazynu w centrali rozgłośni komunikatów,
  • oraz czytelnik.php – subskrybujący treści przesyłane przez wydawcę, również korzystający z centrali rozgłośni komunikatów.

Program wydawca.php

W programie tym ponownie będziemy bazować na kodzie jaki stworzyliśmy w ramach poprzednich tutoriali, czyli rozszerzymy następujący program o nowe instrukcje:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

// nowe instrukcje

register_shutdown_function('shutdown', $channel, $connection);

Najpierw definiujemy nazwę centrali komunikatów:

$exchangeName = 'empik';

i deklarujemy jej parametry (w tym wypadku typ fanout):

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);

następnie tworzymy nowe wydanie jednego magazynu/kilku magazynów:

$date  = new DateTime();
$issue = ((int) $date->format('n') + 1) . '/' . $date->format('Y');

$data = implode(' ', array_slice($argv, 1));

if (empty($data)) {
    $data = 'aktualne wydanie ' . $issue;
}

a na końcu publikujemy komunikat:

$channel->basic_publish($message, $exchangeName);

Cały program wydawca.php wygląda więc następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'empik';

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);

$date  = new DateTime();
$issue = ((int) $date->format('n') + 1) . '/' . $date->format('Y');

$data = implode(' ', array_slice($argv, 1));

if (empty($data)) {
    $data = 'aktualne wydanie ' . $issue;
}

$message = new AMQPMessage($data);

$channel->basic_publish($message, $exchangeName);

echo " [x] Wysłano ", $data, "\n";

register_shutdown_function('shutdown', $channel, $connection);

Jeśli teraz spróbujemy wydać jakiś magazyn np. aktualne wydanie

Terminal1 $ php wydawca.php 
 [x] Wysłano: aktualne wydanie 7/2018

lub jedno z poprzednich:

Terminal1 $ php wydawca.php 2/2016
 [x] Wysłano: 2/2016

to rezultatem tego będzie utrata wysłanych komunikatów, gdyż – zgodnie z tym co powiedziano wcześniej – brak aktywnych subskrybentów komunikatów oznacza utracenie przesyłanych danych.

Program czytelnik.php

Napiszmy więc teraz program czytelnik.php

W nim ponownie bazujemy na kodzie źródłowym poprzednich programów-odbiorców:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

// nowe instrukcje

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Znowu definiujemy nazwę centrali komunikatów:

$exchangeName = 'empik';

i jej typ:

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);

Następnie tworzymy tymczasową kolejkę, z której korzystać będzie wyłącznie pojedyncza instancja naszego program czytelinik.php:

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

$channel->queue_bind($queueName, $exchangeName);

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume(
    $queueName,
    '',
    false,
    true,
    false,
    false,
    function($message) {
        echo " [x] Odebrano ", $message->body, "\n";
    }
);

Pełny kod źródłowy programu będzie wyglądał jak poniżej:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$exchangeName = 'empik';

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

$channel->queue_bind($queueName, $exchangeName);

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume(
    $queueName,
    '',
    false,
    true,
    false,
    false,
    function($message) {
        echo " [x] Odebrano ", $message->body, "\n";
    }
);

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Gdy teraz uruchomimy nasz program czytelnik.php zobaczymy, iż korzysta on z tymczasowej kolejki o wygenerowanej przez RabbitMQ nazwie np.

Terminal2 $ php czytelnik.php
 [*] Oczekiwanie na wydanie w amq.gen-RHJP1OPXQifo0ctvaDMFww.
     Naciśnij CTRL+C aby zakończyć.

Pora teraz ponownie uruchomić program wydawca.js tak jak poprzednim razem:

Terminal1 $ php wydawca.php
 [x] Wysłano: aktualne wydanie 3/2018
Terminal1 $ php wydawca.php 2/2016
 [x] Wysłano: 2/2016
Terminal1 $ php wydawca.php 5/2015 6/2015 9/2015
 [x] Wysłano: 5/2015 6/2015 9/2015

W drugim terminalu zobaczymy działanie naszego czytelnika:

Terminal2 $ php czytelnik.php
 [*] Oczekiwanie na wydanie w amq.gen-RHJP1OPXQifo0ctvaDMFww.
     Naciśnij CTRL+C aby zakończyć.
 [x] Odebrano aktualne wydanie 3/2018
 [x] Odebrano 2/2016
 [x] Odebrano 5/2015 6/2015 9/2015

Jeśli teraz w trzeciej konsoli ponownie uruchomimy program czytelnik.php – zobaczymy – tak jak wcześniej wspomniano, iż system RabbitMQ przydzielił mu inną kolejkę niż programowi w drugim terminalu:

Terminal3 $ php czytelnik.php
 [*] Oczekiwanie na wydanie w amq.gen-95Qaryoo-GWBXUQeZP01OQ.
     Naciśnij CTRL+C aby zakończyć.

Tak więc z punktu nadawcy-wydawcy – nic się nie zmienia – choć nie wie on ilu odbiorców otrzymało przesłany komunikat lub czy ktokolwiek ją otrzymał. Inaczej wygląda to od strony subskrybenta.

Sytuację tą można zobrazować jeszcze w następujący sposób:

  • posiadamy sieć komputerową składającą się z czterech komputerów, jednak jeden jest wyłączony;
  • jeden z trzech komputerów pełni funkcję nadawcy,
  • dwa pozostałe subskrybują kanał centrali komunikatów, do którego nadawca przesyła informacje;
  • pierwsze komunikaty są wysyłane w momencie, gdy jeden komputer nie działa;
  • kolejne, gdy jest on już włączony;
  • a ostatnie, gdy znowu nie działa.

Gdybyśmy mieli przetrzymywać komunikaty dla początkowo wyłączonego komputera, a następnie je z nim synchronizować, to rezultatem takiego działania byłoby nieustanne zwiększanie ilości danych do przechowania i przesłania odbiorcy.

Takie podejście ma swoje plusy i minusy. Wszystko zależy od rodzaju komunikatów i tego co chcemy z nimi zrobić.

Jeśli byłyby to np. wiadomości przesyłane w ramach czata – nie moglibyśmy sobie pozwolić na ich utratę. Natomiast np. w przypadku logów aplikacji, czy usługi – możemy skorzystać z takiego rozwiązania zakładając, że zawsze będzie obecna co najmniej jedna aplikacja subskrybentów.

Przykładem takiego rozwiązania może być pobieranie logów i zapisywanie ich na dysku lub w bazie danych przez pierwszą aplikację-odbiorcę oraz drukowanie ich w konsoli / terminalu przez drugi program – gdy potrzebujemy zweryfikować działanie naszej aplikacji w różnych warunkach.

Przykładowe zastosowania

Centrale komunikatów typu fanout można stosować przy obsłudze:

  • tabel wyników czy informacji o globalnych wydarzeniach w grach typu Massively Multiplayer Online (w skrócie MMO)
  • informacji o wynikach sportowych oraz ich aktualizacjach przekazywanych w czasie „prawie rzeczywistym” na poświęconych temu serwisach internetowych przeznaczonych dla użytkowników urządzeń mobilnych oraz komputerów,
  • rozproszonych systemów transmitujących aktualne informacje różnego rodzaju na temat zmian stanu określonych obiektów lub ich konfiguracji.

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.