пятница, 31 октября 2014 г.

02. ZeroMQ: схема "Издатель - Подписчик" (Publisher-Subscriber).

Задача: (начало здесь)


Автоматическая метеостанция измеряет температуру, атмосферное давление и скорость ветра. Результаты измерений время от времени (например, после завершения цикла измерений) передаются "всем заинтересованным лицам".
Кого-то интересует всё, кому-то нужна температура, кого-то волнует только скорость ветра.




Пишем код метеостанции (), то есть, сервер-издатель.


Код сервера: 



program PS_Server;
{$APPTYPE CONSOLE}
uses
  SysUtils, ZMQ_h;


var
  fContext: Pointer;
  fSocket: Pointer;
  fDummy: string;
  fMsgStr: string;
  fMessage: zmq_msg_t;

begin
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocket, 'tcp://*:4040');
  Writeln('Publisher started...');
  Randomize;

  while True do begin
    Sleep(100); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    zmq_msg_init(@fMessage); // Инициализация zmq_msg_t
    zmq_msg_init_size(@fMessage, Length(fMsgStr) * SizeOf(Char)); // Резервирование памяти
    // Копирование данных из строки в буфер сообщения:
    move(PChar(fMsgStr)^, zmq_msg_data(@fMessage)^, Length(fMsgStr) * SizeOf(Char));
    zmq_msg_send(@fMessage, fSocket, 0); // Пересылка
    zmq_msg_close(@fMessage); // Всё

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    zmq_msg_init(@fMessage);
    zmq_msg_init_size(@fMessage, Length(fMsgStr) * SizeOf(Char));
    move(PChar(fMsgStr)^, zmq_msg_data(@fMessage)^, Length(fMsgStr) * SizeOf(Char));
    zmq_sendmsg(fSocket, @fMessage, 0);
    zmq_msg_close(@fMessage);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    zmq_send(fSocket, PChar(fMsgStr), Length(fMsgStr) * SizeOf(Char), 0);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.

Инициализация: все то же самое, что и для работы по шаблону "Запрос - Ответ". За исключением того сокет создается с опцией ZMQ_PUB (сокет - издатель):
zmq_socket(fContext, ZMQ_PUB);

Рабочий цикл: сервер только отправляет сообщения подписчикам (методы zmq_msg_send, zmq_sendmsg, zmq_send). Попытка получить данные выбросит исключение. .... В каждом рабочем цикле отправляется три сообщения (вернее, "публикуется"): о температуре, о давлении и о скорости ветра. Для разнообразия показаны разные способы формирования сообщений.

При публикации температуры и давления используется структура типа zmq_msg_t:
fMessage: zmq_msg_t;
Служебная структура, ничего особенного в ней нет, просто массив из 48 байт.
type
  zmq_msg_t = packed record
    _: array[0..47] of Byte;
  end;

Удивительно, но 48 байт стало только на днях, в версии ZMQ 4.1.0. До этого всегда было 32. То есть, новые программы будут несовместимы со старыми библиотеками, и наоборот. 

Чтобы переслать данные, нужно попросить библиотеку ZMQ инициализировать область памяти нужного размера. А затем заполнить эту область:

    zmq_msg_init(@fMessage); // Инициализация zmq_msg_t
    zmq_msg_init_size(@fMessage, Length(fMsgStr) * SizeOf(Char)); // Резервирование памяти
    // Копирование данных из строки в буфер сообщения:
    move(PChar(fMsgStr)^, zmq_msg_data(@fMessage)^, Length(fMsgStr) * SizeOf(Char));

А затем - отправить сообщение, используя метод zmq_msg_send или zmq_sendmsg:

    zmq_msg_send(@fMessage, fSocket, 0); // Пересылка

или

    zmq_sendmsg(fSocket, @fMessage, 0);

... В нашем случае мы уже имеем готовый буфер с данными - строку fMsgStr:

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);

Можно не заморачиваться с zmq_msg_t и сразу вызвать метод zmq_send:
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0); 
Тогда сервер - издатель получается еще короче:
program PS_Server;
{$APPTYPE CONSOLE}
uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fDummy: string;
  fMsgStr: string;

begin
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocket, 'tcp://*:4040');
  Writeln('Publisher started...');
  Randomize;

  while True do begin
    Sleep(100); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.

Еще раз: сервер - издатель публикует данные асинхронно, ему в общем случае начхать на клиентов - подписчиков. То есть, если клиента запустить после запуска сервера, то все , что было опубликовано ранее, теряется. Поэтому, вероятно, имеет смысл запускать сначала подписчиков, а потом уже сервер - издатель.
 ...
 ~~~~~~~~~~~~~~~~~~~~

Теперь разберёмся с клиентами (подписчиками).

Код клиента:

program PS_Client;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h;

var
  fContext: Pointer;
  fSocket: Pointer;
  fMessage: zmq_msg_t;
  fDummy: string;
  fMsgStr: string;
  fLen: Integer;

begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета
  Writeln('Subscriber started...');

  while True do begin
    zmq_msg_init(@fMessage);
    fLen :=  zmq_msg_recv(@fMessage, fSocket, 0); // Прием данных
    SetLength(fMsgStr, fLen div SizeOf(Char)); // Формирование буфера строки
    Move(zmq_msg_data(@fMessage)^, PChar(fMsgStr)^, fLen); // Копирвоание данных
    Writeln(fMsgStr);
    zmq_msg_close(@fMessage);
  end;
  zmq_ctx_destroy(fContext);
  Readln;
end.

Запускаем несколько клиентов, потом стартуем сервер и наблюдаем, что все гладко и сладко:



Эти тупые клиенты-подписчики реагируют на все публикации.
А хорошо бы, одни чтобы реагировали на температуру, другие - на ветер, третьи - на что-нибудь еще.
Как выяснилось, это сделать несложно. Клиент, настраивая сокет - подписчик, должен указать в параметрах строку фильтра.
Вместо:
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета 
- следует указать, например 'Temperature':
const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета

Теперь этот подписчик получит только те сообщения, которые начинаются с 'Temperature':

Фильтров может быть добавлено несколько. В этом случае клиент получит только те сообщения, которые соответствуют любому из фильтров:
const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter2), SizeOf(cFilter2)); // Настройка сокета


Таким образом, получаем сообщения только о температура и о давлении:


 Замечания.
1. Сообщения фильтруются клиентом - подписчиком, не сервером-издателем. Так написано. То есть, клиент на транспортном уровне вроде бы получает все поступающие сообщения. Как мне кажется, это должно здорово снижать производительность системы в случае, когда сообщения длинные и поступают часто. Однако, тесты "на коленке" (много разных "длинных" сообщений + фильтр) показали, что это не так. Возможно, при включенной фильтрации клиент получает не сообщение целиком, а только его начальную часть, для сравнения с фильтром. Если сообщение "не годится", оставшаяся часть не принимается.
Возможно.

2. Сервер публикует сообщения когда нравится (т.е., асинхронно с клиентом), а клиент-подписчик читает их по готовности, начиная с момента коннекта к серверу-издателю. Таким образом, если клиент обрабатывает сообщения слишком медленно, очередь сообщений может стать слишком большой, и может случиться беда. ZeroMQ обрабатывает данную ситуацию в зависимости от того, как вы настроите сокет-подписчик и сокет-издатель.

Каждое соединение между исходящим сокетом (на сервере) и входящем (на клиенте) реализуется с помощью т.н. "труб" (pipes).
То есть, между сокетом - источником сообщения и сокетом - получателем создаются "трубы" (pipes), по которым "текут" сообщения.

Можно задать "объём" трубы как на источнике, так и на получателе. То есть, "максимально допустимый уровень воды"(high-water mark - HWM).

Некоторые сокеты (типа PUB, PUSH) имеют только исходящие буферы, для них можно определить HWM на отправление.
Если сокеты принимающие (типа SUB, PULL, REQ, REP), то для них можно определить определить HWM на прием.
Есть сокеты, которые работают в обе стороны (DEALER, ROUTER, PAIR), для этих можно определить оба значения HWM.

Так вот, уровень HWM задается все тем же методом zmq_setsockopt(), с опциями:

    ZMQ_SNDHWM: - задать high water mark для исходящих сообщений (для сокета - издателя)
    ZMQ_RCVHWM: - задать high water mark для входящих сообщений (для сокета - подписчика)

По умолчанию HWM для обоих типов равен 1000 сообщений. Если буфер заполнен, то, в зависимости от типа сокета, оставшиеся сообщения либо игнорируются, либо выполняется блокировка процесса.
Пишут, что ZeroMQ не гарантирует, что сокет сможет принять столько сообщений, сколько указано при установке ZMQ_SNDHWM,  реально граница может быть на уровне 60-70% от заданного HWM. Типа, "рекомендация" для системы.

Итак, в дополнение к схеме "Запрос/Ответ" мы разобрали схему "Издатель/Подписчик" вот с такой топологией сети:




Теперь попробуем разобраться с более удивительными вещами. Продолжение.

Комментариев нет :

Отправить комментарий