Tutoriale / Pierwsza kolejka komunikatów

8276ccbe-8923-72c0-0de8-08b28709f49f

Producent i konsument komunikatów

Skoro mamy już wszystko przygotowane (działające środowisko node.js), pora na odrobinę teorii na temat działania RabbitMQ.

Na samym początku przygody z pośrednikiem komunikatów RabbitMQ przyjmiemy, że nadawcę komunikatów nazywamy producentem, a odbiorcę komunikatów konsumentem. Zaś serwer RabbitMQ po prostu kolejką.

Aby lepiej zilustrować zależności pomiędzy tymi obiektami, posłużmy się grafikami.

Producenta (aplikację nadawcy) oznaczmy jako:

Aplikacja nadawcy

Kolejkę serwera RabbitMQ jako:

Kolejka komunikatów

Zaś konsumenta (aplikację odbiorcy) jako:

Aplikacja odbiorcy

Cały diagram prezentujący drogę wiadomości od nadawcy do odbiorcy wygląda następująco:

Diagram: producent - kolejka komunikatów - konsument

Kolejka może przechowywać dowolną liczbę komunikatów – ogranicza ją tylko rozmiar pamięci RAM i miejsce na dysku twardym, na którym zainstalowany jest pośrednik komunikatów.

Pierwsza para programów

Nadszedł czas by napisać nasz pierwszy, prawdziwy program służący do wysyłania komunikatów do kolejki RabbitMQ.

W zasadzie do napisania mamy dwa programy:

  • pierwszy będący nadawcą-producentem, który wysyła dane do naszego kuriera komunikatów (nadaj.js)
  • oraz drugi - będący odbiorcą-konsumentem, a więc będzie odbierał dane z kolejki.

Program nadaj.js

W pliku nadaj.js potrzebujemy na początku zaimportować bibliotekę amqplib, a następnie zainicjować połączenie z kurierem wiadomości:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {});

Kolejnym etapem w naszym programie jest utworzenie kanału połączenia AMQP, w którym będziemy umieszczać zarówno kod wysyłający dane w programie producenta, jak i je odbierający w programie konsumenta:

#!/usr/bin/env node
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) {}); });

Aby wysłać komunikat do serwera RabbitMQ, potrzebujemy zdefiniować kolejkę i jej parametry.

W naszym przykładzie kolejka będzie nazywała się hello i będzie nietrwała, o czym świadczy parametr durable ustawiony na false. Szerzej na ten temat omówimy przy drugiej parze programów w cyklu tutoriali.

Deklaracja kolejki inicjuje ją tylko w przypadku, gdy nie została ona wcześniej utworzona. Jeśli kolejka istnieje, to powtórna deklaracja jej w naszym programie nie zmieni jej stanu (ang. idempotent) i nie spowoduje zgłoszenia błędu.

Kompletny kod programu nadaj.js prezentuje się następująco:

#!/usr/bin/env node
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'hello'; ch.assertQueue(q, {durable: false}); ch.sendToQueue(q, new Buffer('Witamy!')); console.log(" [x] Wysłano 'Witamy!'"); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });

Stan kolejki po wysłaniu komunikatu możemy sprawdzić w panelu administracyjnym:

Stan kolejki hello po wysłaniu pojedynczego komunikatu

Ilustruje on sytuację, gdy wysłano 1 komunikat i nie posiada on jeszcze odbiorcy (żaden program nie zgłosił się po niego), tak więc ten jeden komunikat czeka w kolejce na odbiór.

Program odbierz.js

Na początku również definiujemy kolejkę o nazwie hello. Robimy to ponieważ program konsumenta komunikatów może być uruchomiony przed programem wysyłającym dane, co ma znaczenie, gdy tak skonfigurowana kolejka nie istnieje jeszcze w systemie RabbitMQ.

Kolejnym elementem jest zainicjowanie reagowania na pojawiające się w kolejce komunikaty:

ch.consume(q, function(msg) {});

Gdy takowe się pojawią nasz program automatycznie je pobierze z pośrednika komunikatów, zmniejszając tym samym liczbę pakietów oczekujących w kolejce.

Zatem za każdym razem, gdy serwer prześle do naszego odbiorcy komunikat przechowywany w kolejce, wywołany zostanie następujący kod, który drukuje otrzymaną treść komunikatu w terminalu:

console.log(" [x] Odebrano %s", msg.content.toString());

Rezultat uruchomienia programu odbierz.js jest następujący:

$ ./odbierz.js
 [*] Oczekiwanie na wiadomości w hello.
     Naciśnij CTRL+C aby zakończyć.
 [x] Odebrano Witamy!

Efekt działania tego programu możemy zobaczyć również w panelu administracyjnym kolejki hello:

Stan kolejki hello po wysłaniu pojedynczego komunikatu

Jak widać, komunikat przechowywany w kolejce znalazł odbiorcę, dzięki czemu kolejka jest teraz pusta.

Oto pełny kod programu odbierz.js:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Oczekiwanie na wiadomości w %s.", q);
    console.log("     Naciśnij CTRL+C aby zakończyć.");
    ch.consume(q, function(msg) {
      console.log(" [x] Odebrano %s", msg.content.toString());
    }, {noAck: true});
  });
});

Wymaga w nim wyjaśnienia jedna rzecz: otóż metodę consume(), wykonywaną na obiekcie połączenia AMQP, wywołujemy z parametrem noAck ustawionym na true.

Oznacza to, że odbiorca komunikatu nie potwierdza (ang. no acknowledgement), iż komunikat odczytał. Innymi słowy, serwer RabbitMQ przesyła do konsumenta komunikat, nie sprawdzając czy dotarła ona prawidłowo, czy może została utracona, bo w trakcie jej przetwarzania program odbiorcy zakończył działanie i nie zdążył skończyć wykonywania jej obróbki (np. zapisać w bazie danych).

Jest to sytuacja jak w tradycyjnej poczcie - list został zabrany z placówki pocztowej, ale nie wiadomo, czy dotarł do odbiorcy.

W przypadku poczty - rozwiązaniem takiej sytuacji jest wysyłanie za potwierdzeniem odbioru.

Jak widać, analogii systemu RabbitMQ do zasad doręczania przesyłek za pomocą tradycyjnej poczty czy kuriera jest więcej. Do tego tematu powrócimy podczas pisania drugiej pary przykładowych programów.

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-node.js

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.