(Начало - здесь)
Вернемся к схеме "Издатель - Подписчик". (Приложение "Метеостанция").
Вспомним, что при подписке данные можно фильтровать.
Однако, фильтрация по самим данным не всегда удобна. Куда удобнее фильтровать по значению ключевого поля, связанному с данными.
Например, пусть будут такие ключи:
- T - Температура
- P - Давление
- W - Скорость ветра
Или данные о скорости ветра разбить на две подгруппы:
- WS - ветер до 5 м/с;
- WF - ветер 5 м/с и более.
// Температура fMsgStr := Format('Temperature : %d C', [20 - Random(40)]); s_send(fSocketPublisher, 'T', ZMQ_SNDMORE); // 1я часть - ключ s_send(fSocketPublisher, fMsgStr);// 2я часть - тело сообщения
А клиент, при оформлении подписки, должен указать фильтр:
const cFilter1: string = 'T'; cFilter2: string = 'P'; cFilter3: string = 'W'; begin fContext := zmq_ctx_new(); // Инициализация // Подключаем сокет подписчика fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB); zmq_connect(fSocketSubscriber, 'tcp://localhost:5561'); zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar(cFilter1), Length(cFilter1) * SizeOf(Char)); // Настройка сокета
При этом сообщение будет фильтроваться по первому кадру, а приходить отфильтрованное сообщение будет полностью.
Сервис "Метео", составное сообщение:
program EnvPS_Service; {$APPTYPE CONSOLE} // Метеостанция uses FastMM4, SysUtils, ZMQ_h, ZMQ_Utils; var fContext: Pointer; fSocketPublisher: Pointer; fDummy: string; fMsgStr: string; i: Integer; begin fContext := zmq_ctx_new(); // Сокет для общения с клиенатами fSocketPublisher := zmq_socket(fContext, ZMQ_PUB); zmq_bind(fSocketPublisher, 'tcp://*:5561'); Writeln('Press Enter, please, when all subscribers will be ready...'); Readln; // Теперь раздача 100 собщений Randomize; for i := 0 to 99 do begin Sleep(1); // Типа измеряет что-то // Температура fMsgStr := Format('Temperature : %d C', [20 - Random(40)]); s_send(fSocketPublisher, 'T', ZMQ_SNDMORE); // 1я часть - ключ s_send(fSocketPublisher, fMsgStr);// 2я часть - тело сообщения // Атм. давление fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]); s_send(fSocketPublisher, 'P', ZMQ_SNDMORE); s_send(fSocketPublisher, fMsgStr); // Скорость ветра fMsgStr := Format('Wind : %d m/s', [Random(10)]); s_send(fSocketPublisher, 'W', ZMQ_SNDMORE); s_send(fSocketPublisher, fMsgStr); end; Writeln('Publisher stopped...'); zmq_close(fSocketPublisher); zmq_ctx_destroy(fContext); Readln(fDummy); end.
Клиент "Метео", составное сообщение
program EnvPS_Client; {$APPTYPE CONSOLE} // Метеостанция uses SysUtils, ZMQ_h, ZMQ_Utils; var fContext: Pointer; fSocketSubscriber: Pointer; fPrefix: string; fBody: string; fCnt: Integer = 0; const cFilter1: string = 'T'; cFilter2: string = 'P'; cFilter3: string = 'W'; begin fContext := zmq_ctx_new(); // Инициализация // Подключаем сокет подписчика fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB); zmq_connect(fSocketSubscriber, 'tcp://localhost:5561'); zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar(cFilter1), Length(cFilter1) * SizeOf(Char)); // Настройка сокета Writeln('Subscriber started...'); while True do begin fPrefix := s_recv(fSocketSubscriber); // Прием данных, префикс fBody := s_recv(fSocketSubscriber); // Прием данных, тело Writeln(fPrefix, ' ', fBody); Inc(fCnt); if fCnt > 10 then break end; Writeln('Received ', fCnt, ' updates'); zmq_close(fSocketSubscriber); zmq_ctx_destroy(fContext); Readln; end.
Запускам, видим, что получаем данные только о температуре:
Разбиение сообщения на части удобно, в том числе, например, для логического разделения составных данных.Например: Ключ - Адрес - Основное сообщение.
Поговорим, наконец, о вотермарках. (Продолжение).
Комментариев нет :
Отправить комментарий