суббота, 1 ноября 2014 г.

05. ZeroMQ: поллинг. Опрашиваем готовность сразу нескольких сокетов. Метод zmq_poll().


Работа с несколькими сокетами ZMQ.

Во всех примерах, что мы рассмотрели ранее, схема работы была примерно одинаковой - в цикле повторялись одни те же действия:
    

    Ожидание сообщение из сокета
    Обработка сообщения.
    "Смыть, повторить."

Что делать, если нам нужно читать сообщения из нескольких конечных точек одновременно?
Самое простое - коннектить один сокет ко всем нужным конечным точкам и позволить, чтобы ZeroMQ втягивала нужные нам данные. Это допустимо, если все конечные точки настроены для работы по одному и тому же шаблону, но может быть и недопустмо, если, например, коннектить PULL - сокеты к конечной точке типа PUB.

Так вот, чтобы иметь возможность читать из множества сокетов сразу, следует использовать метод zmq_poll().


Более того, рекомендуется обернуть zmq_poll() в удобный для ситуации собственный фреймворк.

Создадим простое приложение, которое покажет, как читать данные из неблокирующего сокета.
Будем читать данные из двух сокетов с использованием неблокирующего чтения. Приложение будет работать и как подписчик, и как работник в системе обработки параллельных задач. Вот оно:
program MS_Reader;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h;

var

  fConext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSubscriber: Pointer;
  fMsgBuff: array[0..255] of Char;
  fSize: Integer;
begin

// Подключение к задаче "ventilator"
  fConext := zmq_ctx_new();
  fSocketReceiver := zmq_socket(fConext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Подключение к серверу метеостанции
  fSocketSubscriber := zmq_socket(fConext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5556');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar('10001 '), 6);

// Процесс обработки для обоих сокетов
// Считаем, что трафик от  задачи "ventilator" для нас важнее
  while true do begin
    while true do begin
      fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), ZMQ_DONTWAIT);
      if fSize >= 0 then
        // Выполнение задания
      else
        break;
    end;
    while true do begin
      fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), ZMQ_DONTWAIT);
      if fSize >= 0 then
    // Обработка данных о погоде
      else
        break;
    end;
    // Активности нет, спим 1 ms
    Sleep(1);
  end;
  zmq_close(fSocketReceiver);
  zmq_close(fSocketSubscriber);
  zmq_ctx_destroy(fConext);

  Readln;

end.
 

Недостатком такого способа является некоторая дополнительная задержка на обработке первого сообщения (sleep(1) в конце цикла, когда нет никаких сообщений в процессе). Это может оказаться проблемой в приложениях, где задержка в миллисекунды имеет жизненно важное значение. Кроме того, необходимо быть уверенным, что sleep () или любая другая функция задержки не жрет процессорное время.

А теперь рассмотрим такое же приложение, но уже с использованием zmq_poll().
program MS_Reader_Poll;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  ZMQ_h;

var

  fConext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSubscriber: Pointer;
  fMsgBuff: array[0..255] of Char;
  fSize: Integer;
  fItems: array[0..1] of zmq_pollitem_t;
begin

// Подключение к задаче "ventilator"
  fConext := zmq_ctx_new();
  fSocketReceiver := zmq_socket(fConext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Подклчение к серверу метеостанции
  fSocketSubscriber := zmq_socket(fConext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5556');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar('Temperature '), 12);

  fItems[0].socket := fSocketReceiver;
  fItems[0].fd := 0;
  fItems[0].events := ZMQ_POLLIN;
  fItems[0].revents := 0;

  fItems[1].socket := fSocketSubscriber;
  fItems[1].fd := 0;
  fItems[1].events := ZMQ_POLLIN;
  fItems[1].revents := 0;
// Процесс обработки для обоих сокетов
  while true do begin

    zmq_poll(@fItems[0], Length(fItems), -1);
    if ((fItems[0].revents and ZMQ_POLLIN) <> 0) then begin
      fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), 0);
      if fSize >= 0 then
        // Выполнение задания
      else
        break;
      if ((fItems[1].revents and ZMQ_POLLIN) <> 0) then begin
        fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), 0);
        if fSize >= 0 then
    // Обработка данных о погоде
        else
          break;
      end;
    end;
  end;

  zmq_close(fSocketReceiver);
  zmq_close(fSocketSubscriber);
  zmq_ctx_destroy(fConext);

  Readln;

end. 

Видим, что появилась интересная структура:
type
  p_zmq_pollitem_t = ^zmq_pollitem_t;
  zmq_pollitem_t = packed record
    socket: Pointer; // Сокет ZMQ
    fd: Integer; // Стандартный сокет, представленный дескриптором файла
    events: SHORT; // Маска интересующих событий
    revents: SHORT; // Маска произошедших событий
  end;


Массив таких структур
  fItems: array[0..1] of zmq_pollitem_t;

Инициализируется следующим образом:
  fItems[0].socket := fSocketReceiver;
  fItems[0].fd := 0;
  fItems[0].events := ZMQ_POLLIN;
  fItems[0].revents := 0;

  fItems[1].socket := fSocketSubscriber;
  fItems[1].fd := 0;
  fItems[1].events := ZMQ_POLLIN;
  fItems[1].revents := 0;

В каждом элементе указан свой сокет и задана маска ожидаемых событий (ZMQ_POLLIN).
Далее вызывается метод:
    zmq_poll(@fItems[0], Length(fItems), -1);
Первый параметр - указатель на начало массива структур zmq_pollitem_t, второй - число элементов массива. Последний параметр - время ожидания события в ms. Если -1 - то ожидание будет выполняться бесконечно долго. Событие, произошедшее при выполнении zmq_poll(), заносится в битовую маску revents.
PS: Честно говоря, не въехал, как использовать поле fd. Что "за стандартный сокет"? :(

PPS: Благодаря уважаемой Хильде (см. комментарий Hilda Slyusar28 февраля 2016 г., 10:08), появилось понимание, что есть "стандартный сокет". Так вот, fd - дескриптор сокета из библиотеки сокетов используемых протоколов. Для MS Windows (линии NT), например, доступ к tcp/udp сокетам реализован в системной библиотеке сокетов, интерфейс к которой объявлен в winsock2.h. При использовании такого "низкоуровнего" сокета в структуре zmq_pollitem_t, полю socket следует присвоить значение NULL(nil для Delphi), полю fd - значение дескриптора "низкоуровнего" сокета, остальные элементы -  так же, как в примере выше.

Дальше поговорим о том, как элементам сети найти друг друга. (Продолжение).

2 комментария :

  1. Спасибо за Ваши статьи! Очень подробно написано и помогает еще раз переструктурировать знания.

    Для создания хартбитинга использовала отправку и получение сообщений через UDP. Для того, чтобы zmq_poll получал эти данные - использовала как раз параметр fd. То есть socket = NULL, fd = обычный сокет, созданный стандартными средставами (socket() в с++), остальные два параметра остаются прежними - ZMQ_POLLIN и 0.

    ОтветитьУдалить