(Начало - здесь)
Прототипирование потока данных о состоянии
Так как в каждом потоке данных сокетов есть свои маленькие ловушки для раззяв, будем добавлять новые потоки данных в реальный код постепенно, шаг за шагом. Начнем с потока состояния.
Код прототипа обработчика потока состояния:
program Peering1; {$APPTYPE CONSOLE} uses SysUtils, zmq_h, czmq_h, zmq_Utils; // Имитация межброкерного взаимодействия (часть 1) // Прототип потока состояний procedure doMain; var argn: Integer; available: PChar; ctx: p_zctx_t; items: zmq_pollitem_t; peer: string; peer_name: PChar; rc: Integer; self: string; statebe: Pointer; statefe: Pointer; begin // Аргументы командной строки. // Первый аргумент - имя брокера // Прочие аргументы - имена брокеров - партнеров // if (ParamCount < 2) then begin z_Log('syntax: peering1 me {you}...'); exit; end; self := ParamStr(1); z_Log(Format('I: preparing broker at %s...', [self])); Randomize; ctx := zctx_new(); // Биндинг бэкэнд-сокета состояния к конечной точке statebe := zsocket_new(ctx, ZMQ_PUB); rc := zsocket_bind(statebe, 'tcp://127.0.0.1:%s', self); Assert(rc >= 0); // Подключение statefe ко всем партнерам statefe := zsocket_new(ctx, ZMQ_SUB); zsocket_set_subscribe(statefe, ''); for argn := 2 to ParamCount do begin peer := ParamStr(argn); z_Log('I: connecting to state backend at ' + peer); rc := zsocket_connect(statefe, 'tcp://localhost:%s', peer); Assert(rc >= 0); end; // В главной цикле партнерам рассылаются сообщения о состоянии и собираются // сообщения о состоянии от партнеров. Таймаут zmq_poll определяет свой // собственный хартбит: while (true) do begin // Опрос активности, или 1 сек. таймаута items.socket := statefe; items.fd := 0; items.events := ZMQ_POLLIN; items.revents := 0; rc := zmq_poll(@items, 1, 1000 * ZMQ_POLL_MSEC); if rc = -1 then break; // Прерывание // Обработка входящих сообщений о состоянии if (items.revents and ZMQ_POLLIN) <> 0 then begin peer_name := zstr_recv(statefe); available := zstr_recv(statefe); z_Log(Format('%s - %s workers free', [peer_name, available])); zstr_free(peer_name); zstr_free(available); end else begin // Отправка (публикация) случайных значений о стостоянии zstr_sendm(statebe, PChar(self)); zstr_send(statebe, PChar(IntToStr(Random(10)))); end end; zctx_destroy(ctx); end; begin
//При создании многонитевого Delphi приложения обязательно задавать
// значение True для переменной System.IsMultyThread. IsMultyThread := True;
zsys_handler_set(nil); // Ctrl+C
doMain;
readln;
end.
Код рабочий, запускать нужно из командной строки с параметрами:
Peering1.exe первый_параметр второй_и_последующие_параметры
Первый параметр - свой порт, второй и последующие - порты партнеров.
Например:
Peering1.exe 5555 5556
Peering1.exe 5556 5555
Пояснения к коду:
- Каждый брокер идентифицируется уникальным номером порта, который используется при конструировании адреса конечной точки. Реальный брокер, размещенный на отдельном компьютере, будет идентифицироваться по более сложной схеме. Эти схемы мы рассмотрим позднее, а сейчас просто используем домен localhost (или ip 127.0.0.1).
- Мы использовали zmq_poll() в качестве ядра программы. Он обрабатывал входящие сообщения и отправлял сообщения о состоянии. Мы отправляем сообщение о состоянии только тогда, когда у нас нет никаких входящих сообщений и мы ожидали 1 секунду. Если мы будем рассылать сообщения каждый раз, получатся бури из сообщений.
- Мы используем составное (из двух частей) сообщение, содержащее адрес и данные. Нужно знать адрес издателя (того, кто публикует сообщения), чтобы правильно отправлять его задания, и единственным способом сделать это представляется включение адреса в часть сообщения.
- Мы не назначаем идентификаторов для подписчиков, так как если бы мы это делали, то получали бы устаревшую информацию при подключении к работающим брокерам.
- Для издателя мы не устанавливаем значение HWM, но это было бы хорошей идеей при использовании ZeroMQ v2.x.
peering1 5555 5556 5557 # Старт на порту 5555, коннект к 5556 и 5557
peering1 5556 5555 5557 # Старт на порту 5556, коннект к 5555 и 5557
peering1 5557 5555 5556 # Старт на порту 5557, коннект к 5555 и 5556
Видим, как каждый кластер сообщает о своем своем запуске и коннекте к партнерам, а через пару секунд они все начинают бодро (примерно раз в секунду) выводить случайные числа "о состоянии" своих партнеров.
В реальной жизни мы сообщения о состоянии не через регулярные интервалы времени, а только тогда, когда произошло изменение этого состояния, например, когда рабочие становятся доступными или недоступными. Может показаться, что это займет много трафика, но сообщения о состоянии маленькие, и мы прияли как факт, что межкластерные соединения очень быстрые.
Если бы мы хотели отправлять сообщения о состоянии через точные интервалы, мы бы создали дочернюю нить с сокетом statebe. Из внешней главной нити мы бы посылали нерегулярные сообщения в эту нить об изменении состояния и разрешили бы дочерней нити объединять эти сообщения в регулярные исходящие сообщения. Для этого пришлось бы проделать больше работы, чем нужно.
Дальше мы выполним прототипирование локального и облачного потоков данных.
(Продолжение).
Комментариев нет :
Отправить комментарий