воскресенье, 2 ноября 2014 г.

11. ZeroMQ: фильтрация входящих данных для схемы "Издатель - Подписчик" по первому кадру сообщения.


(Начало - здесь)

Вернемся к схеме "Издатель - Подписчик". (Приложение "Метеостанция").
Вспомним, что при подписке данные можно фильтровать.

Однако, фильтрация по самим данным не всегда удобна. Куда удобнее фильтровать по значению ключевого поля, связанному с данными.



Например, пусть будут такие ключи:
  • T - Температура
  • P - Давление
  • W - Скорость ветра
Это гораздо удобнее. Например, ключу W можно сопоставить не только скорость ветра, но и направление ветра.
Или данные о скорости ветра разбить на две подгруппы:
  • WS - ветер до 5 м/с;
  • WF - ветер 5 м/с и более.
Такая схема фильтрации реализуется очень просто: на сервере нужно формировать составное сообщение:
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    s_send(fSocketPublisher, 'T', ZMQ_SNDMORE); // 1я часть - ключ
    s_send(fSocketPublisher, fMsgStr);// 2я часть - тело сообщения



А клиент, при оформлении подписки, должен указать фильтр:
const
  cFilter1: string = 'T';
  cFilter2: string = 'P';
  cFilter3: string = 'W';
begin
  fContext := zmq_ctx_new(); // Инициализация
  // Подключаем сокет подписчика
  fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar(cFilter1), Length(cFilter1) * SizeOf(Char)); // Настройка сокета

При этом сообщение будет фильтроваться по первому кадру, а приходить отфильтрованное сообщение будет полностью.

Сервис "Метео", составное сообщение:
program EnvPS_Service;
{$APPTYPE CONSOLE}
// Метеостанция

uses
  FastMM4, SysUtils, ZMQ_h, ZMQ_Utils;

var
  fContext: Pointer;
  fSocketPublisher: Pointer;
  fDummy: string;
  fMsgStr: string;
  i: Integer;
begin
  fContext := zmq_ctx_new();
  // Сокет для общения с клиенатами
  fSocketPublisher := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketPublisher, 'tcp://*:5561');

  Writeln('Press Enter,  please, when all subscribers will be ready...');
  Readln;

  // Теперь раздача 100 собщений
  Randomize;

  for i := 0 to 99 do begin
    Sleep(1); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    s_send(fSocketPublisher, 'T', ZMQ_SNDMORE); // 1я часть - ключ
    s_send(fSocketPublisher, fMsgStr);// 2я часть - тело сообщения

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    s_send(fSocketPublisher, 'P', ZMQ_SNDMORE);
    s_send(fSocketPublisher, fMsgStr);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    s_send(fSocketPublisher, 'W', ZMQ_SNDMORE);
    s_send(fSocketPublisher, fMsgStr);
  end;
  Writeln('Publisher stopped...');
  zmq_close(fSocketPublisher);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.



Клиент "Метео", составное сообщение
program EnvPS_Client;

{$APPTYPE CONSOLE}
// Метеостанция
uses
  SysUtils, ZMQ_h, ZMQ_Utils;

var
  fContext: Pointer;
  fSocketSubscriber: Pointer;
  fPrefix: string;
  fBody: string;
  fCnt: Integer = 0;

const
  cFilter1: string = 'T';
  cFilter2: string = 'P';
  cFilter3: string = 'W';
begin
  fContext := zmq_ctx_new(); // Инициализация
  // Подключаем сокет подписчика
  fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar(cFilter1), Length(cFilter1) * SizeOf(Char)); // Настройка сокета
  Writeln('Subscriber started...');

  while True do begin
    fPrefix := s_recv(fSocketSubscriber); // Прием данных, префикс
    fBody := s_recv(fSocketSubscriber); // Прием данных, тело
    Writeln(fPrefix, ' ', fBody);
    Inc(fCnt);
    if fCnt > 10 then
      break
  end;
  Writeln('Received ', fCnt, ' updates');

  zmq_close(fSocketSubscriber);
  zmq_ctx_destroy(fContext);
  Readln;
end.



Запускам, видим, что получаем данные только о температуре:

Разбиение сообщения на части удобно, в том числе, например, для логического разделения составных данных.Например: Ключ - Адрес - Основное сообщение.

Поговорим, наконец, о вотермарках. (Продолжение).

Комментариев нет :

Отправить комментарий