Работа с несколькими сокетами ZMQ.
Во всех примерах, что мы рассмотрели ранее, схема работы была примерно одинаковой - в цикле повторялись одни те же действия:
Ожидание сообщение из сокета
Обработка сообщения.
"Смыть, повторить."
Что делать, если нам нужно читать сообщения из нескольких конечных точек одновременно?
Самое простое - коннектить один сокет ко всем нужным конечным точкам и позволить, чтобы ZeroMQ втягивала нужные нам данные. Это допустимо, если все конечные точки настроены для работы по одному и тому же шаблону, но может быть и недопустмо, если, например, коннектить PULL - сокеты к конечной точке типа PUB.
Так вот, чтобы иметь возможность читать из множества сокетов сразу, следует использовать метод zmq_poll().
Более того, рекомендуется обернуть zmq_poll() в удобный для ситуации собственный фреймворк.
Создадим простое приложение, которое покажет, как читать данные из неблокирующего сокета.
Будем читать данные из двух сокетов с использованием неблокирующего чтения. Приложение будет работать и как подписчик, и как работник в системе обработки параллельных задач. Вот оно:
program MS_Reader;
{$APPTYPE CONSOLE}
uses
SysUtils, ZMQ_h;
var
fConext: Pointer;
fSocketReceiver: Pointer;
fSocketSubscriber: Pointer;
fMsgBuff: array[0..255] of Char;
fSize: Integer;
begin
// Подключение к задаче "ventilator"
fConext := zmq_ctx_new();
fSocketReceiver := zmq_socket(fConext, ZMQ_PULL);
zmq_connect(fSocketReceiver, 'tcp://localhost:5557');
// Подключение к серверу метеостанции
fSocketSubscriber := zmq_socket(fConext, ZMQ_SUB);
zmq_connect(fSocketSubscriber, 'tcp://localhost:5556');
zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar('10001 '), 6);
// Процесс обработки для обоих сокетов
// Считаем, что трафик от задачи "ventilator" для нас важнее
while true do begin
while true do begin
fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), ZMQ_DONTWAIT);
if fSize >= 0 then
// Выполнение задания
else
break;
end;
while true do begin
fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), ZMQ_DONTWAIT);
if fSize >= 0 then
// Обработка данных о погоде
else
break;
end;
// Активности нет, спим 1 ms
Sleep(1);
end;
zmq_close(fSocketReceiver);
zmq_close(fSocketSubscriber);
zmq_ctx_destroy(fConext);
Readln;
end.
Недостатком такого способа является некоторая дополнительная задержка на обработке первого сообщения (sleep(1) в конце цикла, когда нет никаких сообщений в процессе). Это может оказаться проблемой в приложениях, где задержка в миллисекунды имеет жизненно важное значение. Кроме того, необходимо быть уверенным, что sleep () или любая другая функция задержки не жрет процессорное время.
А теперь рассмотрим такое же приложение, но уже с использованием zmq_poll().
program MS_Reader_Poll;
{$APPTYPE CONSOLE}
uses
SysUtils,
ZMQ_h;
var
fConext: Pointer;
fSocketReceiver: Pointer;
fSocketSubscriber: Pointer;
fMsgBuff: array[0..255] of Char;
fSize: Integer;
fItems: array[0..1] of zmq_pollitem_t;
begin
// Подключение к задаче "ventilator"
fConext := zmq_ctx_new();
fSocketReceiver := zmq_socket(fConext, ZMQ_PULL);
zmq_connect(fSocketReceiver, 'tcp://localhost:5557');
// Подклчение к серверу метеостанции
fSocketSubscriber := zmq_socket(fConext, ZMQ_SUB);
zmq_connect(fSocketSubscriber, 'tcp://localhost:5556');
zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar('Temperature '), 12);
fItems[0].socket := fSocketReceiver;
fItems[0].fd := 0;
fItems[0].events := ZMQ_POLLIN;
fItems[0].revents := 0;
fItems[1].socket := fSocketSubscriber;
fItems[1].fd := 0;
fItems[1].events := ZMQ_POLLIN;
fItems[1].revents := 0;
// Процесс обработки для обоих сокетов
while true do begin
zmq_poll(@fItems[0], Length(fItems), -1);
if ((fItems[0].revents and ZMQ_POLLIN) <> 0) then begin
fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), 0);
if fSize >= 0 then
// Выполнение задания
else
break;
if ((fItems[1].revents and ZMQ_POLLIN) <> 0) then begin
fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), 0);
if fSize >= 0 then
// Обработка данных о погоде
else
break;
end;
end;
end;
zmq_close(fSocketReceiver);
zmq_close(fSocketSubscriber);
zmq_ctx_destroy(fConext);
Readln;
end.
Видим, что появилась интересная структура:
type
p_zmq_pollitem_t = ^zmq_pollitem_t;
zmq_pollitem_t = packed record
socket: Pointer; // Сокет ZMQ
fd: Integer; // Стандартный сокет, представленный дескриптором файла
events: SHORT; // Маска интересующих событий
revents: SHORT; // Маска произошедших событий
end;
Массив таких структур
fItems: array[0..1] of zmq_pollitem_t;
Инициализируется следующим образом:
fItems[0].socket := fSocketReceiver; fItems[0].fd := 0; fItems[0].events := ZMQ_POLLIN; fItems[0].revents := 0; fItems[1].socket := fSocketSubscriber; fItems[1].fd := 0; fItems[1].events := ZMQ_POLLIN; fItems[1].revents := 0;
В каждом элементе указан свой сокет и задана маска ожидаемых событий (ZMQ_POLLIN).
Далее вызывается метод:
zmq_poll(@fItems[0], Length(fItems), -1);Первый параметр - указатель на начало массива структур zmq_pollitem_t, второй - число элементов массива. Последний параметр - время ожидания события в ms. Если -1 - то ожидание будет выполняться бесконечно долго. Событие, произошедшее при выполнении zmq_poll(), заносится в битовую маску revents.
PS: Честно говоря, не въехал, как использовать поле fd. Что "за стандартный сокет"? :(
PPS: Благодаря уважаемой Хильде (см. комментарий Hilda Slyusar28 февраля 2016 г., 10:08), появилось понимание, что есть "стандартный сокет". Так вот, fd - дескриптор сокета из библиотеки сокетов используемых протоколов. Для MS Windows (линии NT), например, доступ к tcp/udp сокетам реализован в системной библиотеке сокетов, интерфейс к которой объявлен в winsock2.h. При использовании такого "низкоуровнего" сокета в структуре zmq_pollitem_t, полю socket следует присвоить значение NULL(nil для Delphi), полю fd - значение дескриптора "низкоуровнего" сокета, остальные элементы - так же, как в примере выше.
Дальше поговорим о том, как элементам сети найти друг друга. (Продолжение).
Спасибо за Ваши статьи! Очень подробно написано и помогает еще раз переструктурировать знания.
ОтветитьУдалитьДля создания хартбитинга использовала отправку и получение сообщений через UDP. Для того, чтобы zmq_poll получал эти данные - использовала как раз параметр fd. То есть socket = NULL, fd = обычный сокет, созданный стандартными средставами (socket() в с++), остальные два параметра остаются прежними - ZMQ_POLLIN и 0.
Спасибо за пояснение!
Удалить