(Начало здесь)
Согласование работы между узлами сети.
Если нужно согласовывать работы набора узлов в сети, то сокеты типа PAIR уже не так хороши.
Это как раз те области, где стратегии использования нитей и узлов различаются. В большинстве случаев узлы приходят и уходят "сами по себе", а нити обычно статичны. Сокеты типа PAIR не выполнят автоматическое переподключение, если удаленный узел сети уйдет, а потом появится снова. Другим существенным различием в применении узлов и применением нитей является то, что обычно мы имеем фиксированное число нитей , в то время как число рабочих узлов сети меняется.
Рассмотрим уже знакомый сценарий, реализующий схему "Запрос-Ответ" (с метостанцией - издателем и кучей клиентов-подписчиков) и попробуем координировать узлы так, чтобы быть уверенными в том, что при запуске подписчики-клиенты не потеряют данные.
Схема работы приложения:
Поехали. Модифицируем код проекта "Метео". Сервис начинает публиковать данные только после коннекта 10 клиентов. Пакеты данных публикуются 10 000 раз, после чего публикуется сообщение 'END'.
Клиенты при запуске сообщают сервису о своем появлении и подписываются на данные сервиса.
Полученные по подписке данные выводятся в консоль клиента, число полученных пакетов подсчитывается, . Рабочий цикл клиента прерывается при получении сообщения 'END'.
Код сервиса:
Появилось нечто новое:
Связь "сокет-сокет" можно рассматривать как водопровод. Объем воды, который может вытекать и втекать, в общем случае разный: одн сокет может принимать данные от нескольких сокетов и наоборот. Значение максимально допустимого уровня воды называется "Уровень Высокой Воды" (High Water Mark). Для сокетов ZMQ значение определяет верхнюю границу максимального числа сообщений, которые могут накопиться в исходящей очереди сокета. Поговорим о Высокой Воде позднее.
Мы не можем быть уверены, что коннект SUB будет завершен к тому времени, когда завершится диалог REQ/REP. Вообще нет никакой гарантии того, что исходящие соединения завершатся в том или ином порядке, если вы используете любой транспорт за исключением InProc.
Ну, в примере мы воткнули ожидание (sleep(1)) после подпиской и синхропосылками REQ/REP.
Что как бы работает, но тоже в общем случае не гарантирует.
Более надежная схема могла бы выглядеть так:
Дальше поговорим о такой чудесной вещи, как Нуль-Копия: (Продолжение).
Согласование работы между узлами сети.
Если нужно согласовывать работы набора узлов в сети, то сокеты типа PAIR уже не так хороши.
Это как раз те области, где стратегии использования нитей и узлов различаются. В большинстве случаев узлы приходят и уходят "сами по себе", а нити обычно статичны. Сокеты типа PAIR не выполнят автоматическое переподключение, если удаленный узел сети уйдет, а потом появится снова. Другим существенным различием в применении узлов и применением нитей является то, что обычно мы имеем фиксированное число нитей , в то время как число рабочих узлов сети меняется.
Рассмотрим уже знакомый сценарий, реализующий схему "Запрос-Ответ" (с метостанцией - издателем и кучей клиентов-подписчиков) и попробуем координировать узлы так, чтобы быть уверенными в том, что при запуске подписчики-клиенты не потеряют данные.
Схема работы приложения:
- Издатель (сервис метеостанции) заранее знает, сколько будет подписчиков. То есть, это просто волшебное число, которое он откуда-то получает.
- Издатель запускается и ждет, пока подключатся все подписчики. Эта часть и есть процесс согласования. Каждый подписчик подписывается, а затем через другой сокет сообщает издателю, что он готов.
- Когда к издателю подключатся все подписчики, он начинает публиковать данные.
В данном случае для согласования действий между подписчиком и издателем мы будем использовать пары сокетов REQ-REP.
Поехали. Модифицируем код проекта "Метео". Сервис начинает публиковать данные только после коннекта 10 клиентов. Пакеты данных публикуются 10 000 раз, после чего публикуется сообщение 'END'.
Клиенты при запуске сообщают сервису о своем появлении и подписываются на данные сервиса.
Полученные по подписке данные выводятся в консоль клиента, число полученных пакетов подсчитывается, . Рабочий цикл клиента прерывается при получении сообщения 'END'.
Код сервиса:
program SynPS_Service; {$APPTYPE CONSOLE} // Меостанция // Сервис - издатель с синхронизацией запуска uses FastMM4, SysUtils, ZMQ_h, ZMQ_Utils; const c_SUBSCRIBERS_EXPECTED = 10; // Ждем 10 подписчиков! var fContext: Pointer; // Send message from buffer, which we allocate and 0MQ will free for us fSocketPublisher: Pointer; fSocketSyncService: Pointer; fDummy: string; fMsgStr: string; fSubscribers: Integer = 0; fSndhwm: Integer = 1100000; i: Integer; begin fContext := zmq_ctx_new(); // Сокет для общения с клиенатами fSocketPublisher := zmq_socket(fContext, ZMQ_PUB); zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm)); zmq_bind(fSocketPublisher, 'tcp://*:5561'); // Сокет для приема сигналов fSocketSyncService := zmq_socket(fContext, ZMQ_REP); zmq_bind(fSocketSyncService, 'tcp://*:5562'); // Получение синхросигналов от подписчиков Writeln('Waiting for subscribers...'); while fSubscribers < c_SUBSCRIBERS_EXPECTED do begin // - ожидание синхрозапроса s_recv(fSocketSyncService); // - отправка синхроответа s_send(fSocketSyncService, ''); Inc(fSubscribers); end; // Теперь раздача 10 000 пакетов оповещений, а затем - отправка 'END' Randomize; for i := 0 to 9999 do begin Sleep(1); // Типа измеряет что-то // Температура fMsgStr := Format('Temperature : %d C', [20 - Random(40)]); s_send(fSocketPublisher, fMsgStr); // Атм. давление fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]); s_send(fSocketPublisher, fMsgStr); // Скорость ветра fMsgStr := Format('Wind : %d m/s', [Random(10)]); s_send(fSocketPublisher, fMsgStr); end; s_send(fSocketPublisher, 'END'); Writeln('Publisher stopped...'); zmq_close(fSocketPublisher); zmq_close(fSocketSyncService); zmq_ctx_destroy(fContext); Readln(fDummy); end.
Появилось нечто новое:
var ... fSndhwm: Integer = 1100000; ... begin ... zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm)); ...Опция ZMQ_SNDHWM позволяет настроить исходящую пропускную способность сокета.
Связь "сокет-сокет" можно рассматривать как водопровод. Объем воды, который может вытекать и втекать, в общем случае разный: одн сокет может принимать данные от нескольких сокетов и наоборот. Значение максимально допустимого уровня воды называется "Уровень Высокой Воды" (High Water Mark). Для сокетов ZMQ значение определяет верхнюю границу максимального числа сообщений, которые могут накопиться в исходящей очереди сокета. Поговорим о Высокой Воде позднее.
Код клиента:
program SynPS_Client; {$APPTYPE CONSOLE} // Меостанция // Клиент - подписчик с синхронизацией запуска uses SysUtils, ZMQ_h, ZMQ_Utils; var fContext: Pointer; fSocketSubscriber: Pointer; fSocketSyncClient: Pointer; fMsgStr: string; fCnt: Integer = 0; const cFilter1 = 'Temperature'; cFilter2 = 'Pressure'; cFilter3 = 'Wind'; begin fContext := zmq_ctx_new(); // Инициализация // Сначала подключаем сокет подписчика fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB); zmq_connect(fSocketSubscriber, 'tcp://localhost:5561'); zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета Sleep(1);// ZMQ настолько шустрый, что нужно подождать... // Теперь синхронизируемся с издателем fSocketSyncClient := zmq_socket(fContext, ZMQ_REQ); zmq_connect(fSocketSyncClient, 'tcp://localhost:5562'); s_send(fSocketSyncClient, ''); // Отправка сообщения о готовности s_recv(fSocketSyncClient); // Ожидание подтверждения Writeln('Subscriber started...'); while True do begin fMsgStr := s_recv(fSocketSubscriber); // Прием данных if fMsgStr = 'END' then Break; Writeln(fMsgStr); Inc(fCnt) end; Writeln('Received ', fCnt, ' updates'); zmq_close(fSocketSubscriber); zmq_close(fSocketSyncClient); zmq_ctx_destroy(fContext); Readln; end.
Мы не можем быть уверены, что коннект SUB будет завершен к тому времени, когда завершится диалог REQ/REP. Вообще нет никакой гарантии того, что исходящие соединения завершатся в том или ином порядке, если вы используете любой транспорт за исключением InProc.
Ну, в примере мы воткнули ожидание (sleep(1)) после подпиской и синхропосылками REQ/REP.
Что как бы работает, но тоже в общем случае не гарантирует.
Более надежная схема могла бы выглядеть так:
- Издатель открывает PUB - сокет и начинает передавать сообщения "Hello"(без данных).
- Подписчик подключает SUB - сокет и, когда тот принимает сообщение "Hello", то сообщает об этом издателю через пару сокетов REQ/REP.
- Когда издатель получит необходимое число подтверждений от коннекте от подписчиков, он начинает публиковать реальные данные.
Дальше поговорим о такой чудесной вещи, как Нуль-Копия: (Продолжение).
Комментариев нет :
Отправить комментарий