Работа с несколькими сокетами ZMQ.
Во всех примерах, что мы рассмотрели ранее, схема работы была примерно одинаковой - в цикле повторялись одни те же действия:
Ожидание сообщение из сокета
Обработка сообщения.
"Смыть, повторить."
Что делать, если нам нужно читать сообщения из нескольких конечных точек одновременно?
Самое простое - коннектить один сокет ко всем нужным конечным точкам и позволить, чтобы ZeroMQ втягивала нужные нам данные. Это допустимо, если все конечные точки настроены для работы по одному и тому же шаблону, но может быть и недопустмо, если, например, коннектить PULL - сокеты к конечной точке типа PUB.
Так вот, чтобы иметь возможность читать из множества сокетов сразу, следует использовать метод zmq_poll().
Более того, рекомендуется обернуть zmq_poll() в удобный для ситуации собственный фреймворк.
Создадим простое приложение, которое покажет, как читать данные из неблокирующего сокета.
Будем читать данные из двух сокетов с использованием неблокирующего чтения. Приложение будет работать и как подписчик, и как работник в системе обработки параллельных задач. Вот оно:
program MS_Reader; {$APPTYPE CONSOLE} uses SysUtils, ZMQ_h; var fConext: Pointer; fSocketReceiver: Pointer; fSocketSubscriber: Pointer; fMsgBuff: array[0..255] of Char; fSize: Integer; begin // Подключение к задаче "ventilator" fConext := zmq_ctx_new(); fSocketReceiver := zmq_socket(fConext, ZMQ_PULL); zmq_connect(fSocketReceiver, 'tcp://localhost:5557'); // Подключение к серверу метеостанции fSocketSubscriber := zmq_socket(fConext, ZMQ_SUB); zmq_connect(fSocketSubscriber, 'tcp://localhost:5556'); zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar('10001 '), 6); // Процесс обработки для обоих сокетов // Считаем, что трафик от задачи "ventilator" для нас важнее while true do begin while true do begin fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), ZMQ_DONTWAIT); if fSize >= 0 then // Выполнение задания else break; end; while true do begin fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), ZMQ_DONTWAIT); if fSize >= 0 then // Обработка данных о погоде else break; end; // Активности нет, спим 1 ms Sleep(1); end; zmq_close(fSocketReceiver); zmq_close(fSocketSubscriber); zmq_ctx_destroy(fConext); Readln; end.
Недостатком такого способа является некоторая дополнительная задержка на обработке первого сообщения (sleep(1) в конце цикла, когда нет никаких сообщений в процессе). Это может оказаться проблемой в приложениях, где задержка в миллисекунды имеет жизненно важное значение. Кроме того, необходимо быть уверенным, что sleep () или любая другая функция задержки не жрет процессорное время.
А теперь рассмотрим такое же приложение, но уже с использованием zmq_poll().
program MS_Reader_Poll; {$APPTYPE CONSOLE} uses SysUtils, ZMQ_h; var fConext: Pointer; fSocketReceiver: Pointer; fSocketSubscriber: Pointer; fMsgBuff: array[0..255] of Char; fSize: Integer; fItems: array[0..1] of zmq_pollitem_t; begin // Подключение к задаче "ventilator" fConext := zmq_ctx_new(); fSocketReceiver := zmq_socket(fConext, ZMQ_PULL); zmq_connect(fSocketReceiver, 'tcp://localhost:5557'); // Подклчение к серверу метеостанции fSocketSubscriber := zmq_socket(fConext, ZMQ_SUB); zmq_connect(fSocketSubscriber, 'tcp://localhost:5556'); zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar('Temperature '), 12); fItems[0].socket := fSocketReceiver; fItems[0].fd := 0; fItems[0].events := ZMQ_POLLIN; fItems[0].revents := 0; fItems[1].socket := fSocketSubscriber; fItems[1].fd := 0; fItems[1].events := ZMQ_POLLIN; fItems[1].revents := 0; // Процесс обработки для обоих сокетов while true do begin zmq_poll(@fItems[0], Length(fItems), -1); if ((fItems[0].revents and ZMQ_POLLIN) <> 0) then begin fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), 0); if fSize >= 0 then // Выполнение задания else break; if ((fItems[1].revents and ZMQ_POLLIN) <> 0) then begin fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), 0); if fSize >= 0 then // Обработка данных о погоде else break; end; end; end; zmq_close(fSocketReceiver); zmq_close(fSocketSubscriber); zmq_ctx_destroy(fConext); Readln; end.
Видим, что появилась интересная структура:
type p_zmq_pollitem_t = ^zmq_pollitem_t; zmq_pollitem_t = packed record socket: Pointer; // Сокет ZMQ fd: Integer; // Стандартный сокет, представленный дескриптором файла events: SHORT; // Маска интересующих событий revents: SHORT; // Маска произошедших событий end;
Массив таких структур
fItems: array[0..1] of zmq_pollitem_t;
Инициализируется следующим образом:
fItems[0].socket := fSocketReceiver; fItems[0].fd := 0; fItems[0].events := ZMQ_POLLIN; fItems[0].revents := 0; fItems[1].socket := fSocketSubscriber; fItems[1].fd := 0; fItems[1].events := ZMQ_POLLIN; fItems[1].revents := 0;
В каждом элементе указан свой сокет и задана маска ожидаемых событий (ZMQ_POLLIN).
Далее вызывается метод:
zmq_poll(@fItems[0], Length(fItems), -1);Первый параметр - указатель на начало массива структур zmq_pollitem_t, второй - число элементов массива. Последний параметр - время ожидания события в ms. Если -1 - то ожидание будет выполняться бесконечно долго. Событие, произошедшее при выполнении zmq_poll(), заносится в битовую маску revents.
PS: Честно говоря, не въехал, как использовать поле fd. Что "за стандартный сокет"? :(
PPS: Благодаря уважаемой Хильде (см. комментарий Hilda Slyusar28 февраля 2016 г., 10:08), появилось понимание, что есть "стандартный сокет". Так вот, fd - дескриптор сокета из библиотеки сокетов используемых протоколов. Для MS Windows (линии NT), например, доступ к tcp/udp сокетам реализован в системной библиотеке сокетов, интерфейс к которой объявлен в winsock2.h. При использовании такого "низкоуровнего" сокета в структуре zmq_pollitem_t, полю socket следует присвоить значение NULL(nil для Delphi), полю fd - значение дескриптора "низкоуровнего" сокета, остальные элементы - так же, как в примере выше.
Дальше поговорим о том, как элементам сети найти друг друга. (Продолжение).
Спасибо за Ваши статьи! Очень подробно написано и помогает еще раз переструктурировать знания.
ОтветитьУдалитьДля создания хартбитинга использовала отправку и получение сообщений через UDP. Для того, чтобы zmq_poll получал эти данные - использовала как раз параметр fd. То есть socket = NULL, fd = обычный сокет, созданный стандартными средставами (socket() в с++), остальные два параметра остаются прежними - ZMQ_POLLIN и 0.
Спасибо за пояснение!
Удалить