(Начало здесь)
Согласование работы между узлами сети.
Если нужно согласовывать работы набора узлов в сети, то сокеты типа PAIR уже не так хороши.
Это как раз те области, где стратегии использования нитей и узлов различаются. В большинстве случаев узлы приходят и уходят "сами по себе", а нити обычно статичны. Сокеты типа PAIR не выполнят автоматическое переподключение, если удаленный узел сети уйдет, а потом появится снова. Другим существенным различием в применении узлов и применением нитей является то, что обычно мы имеем фиксированное число нитей , в то время как число рабочих узлов сети меняется.
Рассмотрим уже знакомый сценарий, реализующий схему "Запрос-Ответ" (с метостанцией - издателем и кучей клиентов-подписчиков) и попробуем координировать узлы так, чтобы быть уверенными в том, что при запуске подписчики-клиенты не потеряют данные.
Схема работы приложения:
Поехали. Модифицируем код проекта "Метео". Сервис начинает публиковать данные только после коннекта 10 клиентов. Пакеты данных публикуются 10 000 раз, после чего публикуется сообщение 'END'.
Клиенты при запуске сообщают сервису о своем появлении и подписываются на данные сервиса.
Полученные по подписке данные выводятся в консоль клиента, число полученных пакетов подсчитывается, . Рабочий цикл клиента прерывается при получении сообщения 'END'.
Код сервиса:
Появилось нечто новое:
Связь "сокет-сокет" можно рассматривать как водопровод. Объем воды, который может вытекать и втекать, в общем случае разный: одн сокет может принимать данные от нескольких сокетов и наоборот. Значение максимально допустимого уровня воды называется "Уровень Высокой Воды" (High Water Mark). Для сокетов ZMQ значение определяет верхнюю границу максимального числа сообщений, которые могут накопиться в исходящей очереди сокета. Поговорим о Высокой Воде позднее.
Мы не можем быть уверены, что коннект SUB будет завершен к тому времени, когда завершится диалог REQ/REP. Вообще нет никакой гарантии того, что исходящие соединения завершатся в том или ином порядке, если вы используете любой транспорт за исключением InProc.
Ну, в примере мы воткнули ожидание (sleep(1)) после подпиской и синхропосылками REQ/REP.
Что как бы работает, но тоже в общем случае не гарантирует.
Более надежная схема могла бы выглядеть так:
Дальше поговорим о такой чудесной вещи, как Нуль-Копия: (Продолжение).
Согласование работы между узлами сети.
Если нужно согласовывать работы набора узлов в сети, то сокеты типа PAIR уже не так хороши.
Это как раз те области, где стратегии использования нитей и узлов различаются. В большинстве случаев узлы приходят и уходят "сами по себе", а нити обычно статичны. Сокеты типа PAIR не выполнят автоматическое переподключение, если удаленный узел сети уйдет, а потом появится снова. Другим существенным различием в применении узлов и применением нитей является то, что обычно мы имеем фиксированное число нитей , в то время как число рабочих узлов сети меняется.
Рассмотрим уже знакомый сценарий, реализующий схему "Запрос-Ответ" (с метостанцией - издателем и кучей клиентов-подписчиков) и попробуем координировать узлы так, чтобы быть уверенными в том, что при запуске подписчики-клиенты не потеряют данные.
Схема работы приложения:
- Издатель (сервис метеостанции) заранее знает, сколько будет подписчиков. То есть, это просто волшебное число, которое он откуда-то получает.
- Издатель запускается и ждет, пока подключатся все подписчики. Эта часть и есть процесс согласования. Каждый подписчик подписывается, а затем через другой сокет сообщает издателю, что он готов.
- Когда к издателю подключатся все подписчики, он начинает публиковать данные.
В данном случае для согласования действий между подписчиком и издателем мы будем использовать пары сокетов REQ-REP.
Поехали. Модифицируем код проекта "Метео". Сервис начинает публиковать данные только после коннекта 10 клиентов. Пакеты данных публикуются 10 000 раз, после чего публикуется сообщение 'END'.
Клиенты при запуске сообщают сервису о своем появлении и подписываются на данные сервиса.
Полученные по подписке данные выводятся в консоль клиента, число полученных пакетов подсчитывается, . Рабочий цикл клиента прерывается при получении сообщения 'END'.
Код сервиса:
program SynPS_Service;
{$APPTYPE CONSOLE}
// Меостанция
// Сервис - издатель с синхронизацией запуска
uses
FastMM4, SysUtils, ZMQ_h, ZMQ_Utils;
const
c_SUBSCRIBERS_EXPECTED = 10; // Ждем 10 подписчиков!
var
fContext: Pointer; // Send message from buffer, which we allocate and 0MQ will free for us
fSocketPublisher: Pointer;
fSocketSyncService: Pointer;
fDummy: string;
fMsgStr: string;
fSubscribers: Integer = 0;
fSndhwm: Integer = 1100000;
i: Integer;
begin
fContext := zmq_ctx_new();
// Сокет для общения с клиенатами
fSocketPublisher := zmq_socket(fContext, ZMQ_PUB);
zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm));
zmq_bind(fSocketPublisher, 'tcp://*:5561');
// Сокет для приема сигналов
fSocketSyncService := zmq_socket(fContext, ZMQ_REP);
zmq_bind(fSocketSyncService, 'tcp://*:5562');
// Получение синхросигналов от подписчиков
Writeln('Waiting for subscribers...');
while fSubscribers < c_SUBSCRIBERS_EXPECTED do begin
// - ожидание синхрозапроса
s_recv(fSocketSyncService);
// - отправка синхроответа
s_send(fSocketSyncService, '');
Inc(fSubscribers);
end;
// Теперь раздача 10 000 пакетов оповещений, а затем - отправка 'END'
Randomize;
for i := 0 to 9999 do begin
Sleep(1); // Типа измеряет что-то
// Температура
fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
s_send(fSocketPublisher, fMsgStr);
// Атм. давление
fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
s_send(fSocketPublisher, fMsgStr);
// Скорость ветра
fMsgStr := Format('Wind : %d m/s', [Random(10)]);
s_send(fSocketPublisher, fMsgStr);
end;
s_send(fSocketPublisher, 'END');
Writeln('Publisher stopped...');
zmq_close(fSocketPublisher);
zmq_close(fSocketSyncService);
zmq_ctx_destroy(fContext);
Readln(fDummy);
end.
Появилось нечто новое:
var ... fSndhwm: Integer = 1100000; ... begin ... zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm)); ...Опция ZMQ_SNDHWM позволяет настроить исходящую пропускную способность сокета.
Связь "сокет-сокет" можно рассматривать как водопровод. Объем воды, который может вытекать и втекать, в общем случае разный: одн сокет может принимать данные от нескольких сокетов и наоборот. Значение максимально допустимого уровня воды называется "Уровень Высокой Воды" (High Water Mark). Для сокетов ZMQ значение определяет верхнюю границу максимального числа сообщений, которые могут накопиться в исходящей очереди сокета. Поговорим о Высокой Воде позднее.
Код клиента:
program SynPS_Client;
{$APPTYPE CONSOLE}
// Меостанция
// Клиент - подписчик с синхронизацией запуска
uses
SysUtils, ZMQ_h, ZMQ_Utils;
var
fContext: Pointer;
fSocketSubscriber: Pointer;
fSocketSyncClient: Pointer;
fMsgStr: string;
fCnt: Integer = 0;
const
cFilter1 = 'Temperature';
cFilter2 = 'Pressure';
cFilter3 = 'Wind';
begin
fContext := zmq_ctx_new(); // Инициализация
// Сначала подключаем сокет подписчика
fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета
Sleep(1);// ZMQ настолько шустрый, что нужно подождать...
// Теперь синхронизируемся с издателем
fSocketSyncClient := zmq_socket(fContext, ZMQ_REQ);
zmq_connect(fSocketSyncClient, 'tcp://localhost:5562');
s_send(fSocketSyncClient, ''); // Отправка сообщения о готовности
s_recv(fSocketSyncClient); // Ожидание подтверждения
Writeln('Subscriber started...');
while True do begin
fMsgStr := s_recv(fSocketSubscriber); // Прием данных
if fMsgStr = 'END' then
Break;
Writeln(fMsgStr);
Inc(fCnt)
end;
Writeln('Received ', fCnt, ' updates');
zmq_close(fSocketSubscriber);
zmq_close(fSocketSyncClient);
zmq_ctx_destroy(fContext);
Readln;
end.
Мы не можем быть уверены, что коннект SUB будет завершен к тому времени, когда завершится диалог REQ/REP. Вообще нет никакой гарантии того, что исходящие соединения завершатся в том или ином порядке, если вы используете любой транспорт за исключением InProc.
Ну, в примере мы воткнули ожидание (sleep(1)) после подпиской и синхропосылками REQ/REP.
Что как бы работает, но тоже в общем случае не гарантирует.
Более надежная схема могла бы выглядеть так:
- Издатель открывает PUB - сокет и начинает передавать сообщения "Hello"(без данных).
- Подписчик подключает SUB - сокет и, когда тот принимает сообщение "Hello", то сообщает об этом издателю через пару сокетов REQ/REP.
- Когда издатель получит необходимое число подтверждений от коннекте от подписчиков, он начинает публиковать реальные данные.
Дальше поговорим о такой чудесной вещи, как Нуль-Копия: (Продолжение).

Комментариев нет :
Отправить комментарий