Tutoriale / Centrala komunikatów tematycznych

1810d4f7-f068-db87-4186-0ebfc8467b27

Ten rodzaj filtrowania wiadomości jest alternatywą dla omówionej w poprzednim tutorialu centrali komunikatów z nagłówkami, jednak nie może on posiadać dowolnie dobranego klucza routing’u, tak jak to miało miejsce w przypadku centrali komunikatów bezpośrednich.

Ściśle określony klucz powiązania

Tutaj klucz powiązania musi składać się ze słów oddzielonych kropkami np. „gpw.notowania.spolki.kghm”, co można opisać następującym wyrażeniem regularnym:

^[A-Za-z0-9\-_*]+(\.[A-Za-z0-9\-_*]+)*$.

Do konkretnej kolejki zostaną dostarczone wszystkie wiadomości, dla których klucz będzie pasował.

Do jednej kolejki można przypisać kilka takich kluczy, tak jak miało to miejsce w przypadku centrali typu direct. W takim przypadku wystarczy, aby choć jeden klucz pasował, co jest podobne do filtrowania typu „any” w centralach typu headers.

Klucz routing'u składa się ze słów oddzielonych kropkami.

Pojedyncze słowa mogą zostać zastąpione przez:

  • gwiazdki „*” (ang. asterix) zastępujące dokładnie jedno słowo
  • oraz tzw. płotka lub krzyżyka „#” (ang. hash) zastępującego dowolną liczbę słów (zero lub więcej), w tym oddzielających je kropek.

Część klucza routingu, który chcemy zastąpić płotkiem np. „gpw.#”, nie może zaczynać się od kropki, co możemy zapisać następującym wyrażeniem regularnym:

^([^.][A-Za-z0-9\-_*]+)((\.[A-Za-z0-9\-_*]+)*)$

Centrale tematyczne mają wiele zastosowań.

Gdy zadanie stawiane przed centralą filtrującą komunikaty przewiduje wiele aplikacji subskrybujących je selektywnie, ten rodzaj routing'u jest jednym z najlepszych wyborów.

Świetnym przykładem zastosowania centrali tematycznych jest analiza notowań giełdowych pokazana poniżej:

Diagram przedstawiający centralę komunikatów tematycznych na przykładzie przetwarzania informacji giełdowych

Na diagramie możemy zobaczyć:

  • przykład filtrowania składającego się z 4 słów: gpw.notowania.spolki.kghm
  • przykład użycia „gwiazdki”, co pozwala na śledzenie wszystkich informacji związanych z notowaniami spółek na rynku New Connect warszawskiej GPW: gpw.new-connect.*
  • przykład użycia „płotka”, co pozwala śledzić wszystkie dane związane z warszawską Giełdą Papierów Wartościowych: gpw.#

Warto tutaj zwrócić uwagę na przemyślaną hierarchię struktury klucza routing’u, tak aby dodanie kolejnych funkcjonalności nie wiązało się z całkowitym jej przebudowaniem, a w przypadku dużych systemów może się to okazać bardzo trudne lub nawet niewykonalne.

Program notowania-gpw-nadawca.js

Podobnie jak w przypadku pozostałych przykładów programów-wydawców tak i tutaj bazujemy na przykładzie programu fabryka.js:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    // nowe instrukcje
  });

  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

Deklarujemy nazwę centrali komunikatów tematycznych:

var ex = 'notowania-gpw';

a następnie odczytujemy dodatkowe parametry przesłane z linii poleceń:

var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'gpw.info.anonymous';
var msg = args.slice(1).join(' ') || 'Raport skonsolidowany za IV kwartał 2015 roku!';

W przypadku ich braku do centrali komunikatów przesłana zostanie wiadomość „Raport skonsolidowany za IV kwartał 2015 roku!” z kluczem powiązania „gpw.info.anonymous”.

Kolejną rzeczą jest określenie typu centrali komunikatów i jej charakteru:

ch.assertExchange(ex, 'topic', {durable: false});

Na koniec wysyłamy wiadomość do kuriera wiadomości:

ch.publish(ex, key, new Buffer(msg));
console.log(" [x] Wysłano %s: '%s'", key, msg);

Kompletny program notowania-gpw-nadawca.js wygląda następująco:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'notowania-gpw';
    var args = process.argv.slice(2);
    var key = (args.length > 0) ? args[0] : 'gpw.info.anonymous';
    var msg = args.slice(1).join(' ') || 'Raport skonsolidowany za IV kwartał 2015 roku!';

    ch.assertExchange(ex, 'topic', {durable: false});
    ch.publish(ex, key, new Buffer(msg));
    console.log(" [x] Wysłano %s: '%s'", key, msg);
  });

  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

Program notowania-gpw-subskrybent.js

Program subskrybenta wygląda podobnie do programu subskrybenta logów dla centrali komunikatów typu direct.

Na samym początku sprawdzamy, czy użytkownik – uruchamiając program z terminala – podał wymagany co najmniej jeden klucz powiązania, a jeśli nie – wyświetlamy mu odpowiedni komunikat:

var args = process.argv.slice(2);

if (args.length == 0) {
  console.log("Korzystanie: notowania-gpw-subskrybent.js  ");
  console.log("  ./notowania-gpw-subskrybent.js gpw.notowania.spolki.kghm");
  console.log("  ./notowania-gpw-subskrybent.js gpw.new-connect.*");
  console.log("  ./notowania-gpw-subskrybent.js gpw.#");

  process.exit(1);
}

Następnie definiujemy nazwę centrali komunikatów i określamy jej typ:

var ex = 'notowania-gpw';

ch.assertExchange(ex, 'topic', {durable: false});

Na końcu przypinamy do centrali tymczasową kolejkę, która będzie powiązana z określonymi przez użytkownika kluczami routing'u:

ch.assertQueue('', {exclusive: true}, function(err, q) {
  console.log(" [*] Oczekiwanie na wiadomości.");
  console.log("     Naciśnij CTRL+C aby zakończyć.");

  args.forEach(function(key) {
    ch.bindQueue(q.queue, ex, key);
  });

  ch.consume(q.queue, function(msg) {
    console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
  }, {noAck: true});
});

Kompletny program notowania-gpw-nadawca.js wygląda następująco:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

var args = process.argv.slice(2);

if (args.length == 0) {
  console.log("Korzystanie: notowania-gpw-subskrybent.js  ");
  console.log("  ./notowania-gpw-subskrybent.js gpw.notowania.spolki.kghm");
  console.log("  ./notowania-gpw-subskrybent.js gpw.new-connect.*");
  console.log("  ./notowania-gpw-subskrybent.js gpw.#");

  process.exit(1);
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'notowania-gpw';

    ch.assertExchange(ex, 'topic', {durable: false});

    ch.assertQueue('', {exclusive: true}, function(err, q) {
      console.log(" [*] Oczekiwanie na wiadomości.");
      console.log("     Naciśnij CTRL+C aby zakończyć.");

      args.forEach(function(key) {
        ch.bindQueue(q.queue, ex, key);
      });

      ch.consume(q.queue, function(msg) {
        console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
      }, {noAck: true});
    });
  });
});

Przykładowe zastosowania

Centrale komunikatów typu topic można stosować przy:

  • dostarczaniu wiadomości do wybranych lokalizacji geograficznych, np. punktów sprzedaży;
  • przetwarzaniu procesów w tle z wykorzystaniem wielu aplikacji-robotników odpowiedzialnych za konkretne zestawy zadań;
  • śledzeniu notowań giełdowych (w tym różnych operacji finansowych, np. wzrost/spadek ceny akcji);
  • obsłudze aktualności, włączając ich kategoryzację i tagowanie (poprzez skupienie się np. na informacjach sportowych – piłkarskiej lidze w Hiszpanii: Primera División, czy wyników konkretnego zespołu, np. AS Roma);
  • zarządzaniu orkiestrowe (ang. computing orchestration) złożonymi systemami informatycznymi poprzez automatyczne przydzielanie zasobów, koordynację działania, kontrolę działania, pośredniczenie w wymianie wiadomości/informacji (w chmurze – ang. cloud);
  • systemach typu ciągłego dostarczania oprogramowania (ang. continuous deployment) tworzące tzw. buildy aplikacji w oparciu o różne parametry, gdzie pojedyncza aplikacja tzw. buildera jest zdolna do pracy tylko z jedną, konkretną konfiguracją:
    • różne architektury: np. x86/i586, AMD64/IA-64, ARM, MIPS, PowerPC, SPARC;
    • systemy operacyjne: np. Windows, Linuks, MacOS
    • wersje poszczególnych systemów operacyjnych: np. Windows 7, Windows 10, Debian 7, Ubuntu 14.04;
    • czy np. pod poszczególne dystrybucje Linuksa: np. Debian, RedHat, OpenSUSE, Ubuntu.

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-node.js

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.