воскресенье, 2 ноября 2014 г.

09. ZeroMQ: согласование работы между элементами сети.

(Начало здесь)

Согласование работы между узлами сети.

Если нужно согласовывать работы набора узлов в сети, то сокеты типа PAIR уже не так хороши.

Это как раз те области, где стратегии использования нитей и узлов различаются. В большинстве случаев узлы приходят и уходят "сами по себе", а нити обычно статичны. Сокеты типа PAIR не выполнят автоматическое переподключение, если удаленный узел сети уйдет, а потом появится снова. Другим существенным различием в применении узлов и применением нитей является то, что обычно мы имеем фиксированное число нитей , в то время как число рабочих узлов сети меняется.

Рассмотрим уже знакомый сценарий, реализующий схему "Запрос-Ответ" (с метостанцией - издателем и кучей клиентов-подписчиков) и попробуем координировать узлы так, чтобы быть уверенными в том, что при запуске подписчики-клиенты не потеряют данные.

Схема работы приложения:

  • Издатель (сервис метеостанции) заранее знает, сколько будет подписчиков. То есть, это просто волшебное число, которое он откуда-то получает.
  • Издатель запускается и ждет, пока подключатся все подписчики. Эта часть и есть процесс согласования. Каждый подписчик подписывается, а затем через другой сокет сообщает издателю, что он готов.
  • Когда к издателю подключатся все подписчики, он начинает публиковать данные.
В данном случае для согласования действий между подписчиком и издателем мы будем использовать пары сокетов REQ-REP. 


Поехали. Модифицируем код проекта "Метео". Сервис начинает публиковать данные только после коннекта 10 клиентов. Пакеты данных публикуются 10 000 раз, после чего публикуется сообщение 'END'.
Клиенты при запуске сообщают сервису о своем появлении и подписываются на данные сервиса.
Полученные по подписке данные выводятся в консоль клиента, число полученных пакетов подсчитывается, . Рабочий цикл клиента прерывается при получении сообщения 'END'.

Код сервиса:

program SynPS_Service;
{$APPTYPE CONSOLE}
// Меостанция
// Сервис - издатель с синхронизацией запуска

uses
  FastMM4, SysUtils, ZMQ_h, ZMQ_Utils;

const
  c_SUBSCRIBERS_EXPECTED = 10; // Ждем 10 подписчиков!
var
  fContext: Pointer;                // Send message from buffer, which we allocate and 0MQ will free for us
  fSocketPublisher: Pointer;
  fSocketSyncService: Pointer;
  fDummy: string;
  fMsgStr: string;
  fSubscribers: Integer = 0;
  fSndhwm: Integer = 1100000;
  i: Integer;
begin
  fContext := zmq_ctx_new();
  // Сокет для общения с клиенатами
  fSocketPublisher := zmq_socket(fContext, ZMQ_PUB);
  zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm));
  zmq_bind(fSocketPublisher, 'tcp://*:5561');

  // Сокет для приема сигналов
  fSocketSyncService := zmq_socket(fContext, ZMQ_REP);
  zmq_bind(fSocketSyncService, 'tcp://*:5562');

 // Получение синхросигналов от подписчиков
  Writeln('Waiting for subscribers...');
  while fSubscribers < c_SUBSCRIBERS_EXPECTED do begin
  // - ожидание синхрозапроса
    s_recv(fSocketSyncService);
  // - отправка синхроответа
    s_send(fSocketSyncService, '');
    Inc(fSubscribers);
  end;

  // Теперь раздача 10 000 пакетов оповещений, а затем - отправка 'END'

  Randomize;

  for i := 0 to 9999 do begin
    Sleep(1); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    s_send(fSocketPublisher, fMsgStr);

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    s_send(fSocketPublisher, fMsgStr);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    s_send(fSocketPublisher, fMsgStr);
  end;
  s_send(fSocketPublisher, 'END');
  Writeln('Publisher stopped...');
  zmq_close(fSocketPublisher);
  zmq_close(fSocketSyncService);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.




Появилось нечто новое:
var
...
  fSndhwm: Integer = 1100000;
...
begin
...
  zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm));
...
Опция ZMQ_SNDHWM позволяет настроить исходящую пропускную способность сокета.
Связь "сокет-сокет" можно рассматривать как водопровод. Объем воды, который может вытекать и втекать, в общем случае разный: одн сокет может принимать данные от нескольких сокетов и наоборот. Значение максимально допустимого уровня воды называется "Уровень Высокой Воды" (High Water Mark). Для сокетов ZMQ значение определяет верхнюю границу максимального числа сообщений, которые могут накопиться в исходящей очереди сокета. Поговорим о Высокой Воде позднее.


Код клиента:

program SynPS_Client;

{$APPTYPE CONSOLE}
// Меостанция
// Клиент - подписчик с синхронизацией запуска
uses
  SysUtils, ZMQ_h, ZMQ_Utils;

var
  fContext: Pointer;
  fSocketSubscriber: Pointer;
  fSocketSyncClient: Pointer;
  fMsgStr: string;
  fCnt: Integer = 0;

const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  // Сначала подключаем сокет подписчика
  fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, nil, 0);  // Настройка сокета
  Sleep(1);// ZMQ настолько шустрый, что нужно подождать...

  // Теперь синхронизируемся с издателем
  fSocketSyncClient := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketSyncClient, 'tcp://localhost:5562');
  s_send(fSocketSyncClient, ''); // Отправка сообщения о готовности
  s_recv(fSocketSyncClient); // Ожидание подтверждения

  Writeln('Subscriber started...');

  while True do begin
    fMsgStr := s_recv(fSocketSubscriber); // Прием данных
    if fMsgStr = 'END' then
      Break;
    Writeln(fMsgStr);
    Inc(fCnt)
  end;
  Writeln('Received ', fCnt, ' updates');

  zmq_close(fSocketSubscriber);
  zmq_close(fSocketSyncClient);
  zmq_ctx_destroy(fContext);
  Readln;
end.



Мы не можем быть уверены, что коннект SUB будет завершен к тому времени, когда завершится диалог REQ/REP. Вообще нет никакой гарантии того, что исходящие соединения завершатся в том или ином порядке, если вы используете любой транспорт за исключением InProc.
Ну, в примере мы воткнули ожидание (sleep(1)) после подпиской и синхропосылками REQ/REP.
Что как бы работает, но тоже в общем случае не гарантирует.

Более надежная схема могла бы выглядеть так:
  • Издатель открывает PUB - сокет и начинает передавать сообщения "Hello"(без данных).
  • Подписчик подключает SUB - сокет и, когда тот принимает сообщение "Hello", то сообщает об этом издателю через пару сокетов REQ/REP.
  • Когда издатель получит необходимое число подтверждений от коннекте от подписчиков, он начинает публиковать реальные данные.


Дальше поговорим о такой чудесной вещи, как Нуль-Копия: (Продолжение).

Комментариев нет :

Отправить комментарий