(Начало - здесь).
Дальше рассмотрим кодирование брокера с балансировкой нагрузки с использованием API высокого уровня - CZMQ.
Код простого брокера с балансировкой нагрузки:
При создании многонитевого Delphi приложения обязательно задавать значение True для переменной System.IsMultyThread. Но этого делать не нужно, если прямо или косвенно вызывается процедура BeginThread, в которой также задается это значение.
Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами:
Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами:
Составные сообщения теперь принимаются целиком, а не частями:
"Чистый" ZeroMQ в случае прерывания (по Ctrl+C) вернет код завершения операции -1 и установит код ошибка EINTR.
Здесь же (в CZMQ) операция просто вернет nil:
Ну и забавные вещи вроде строк с форматированием ("Си"шный format()):
Отмечу, что zsocket_connect() / zsocket_bind() - процедуры с переменным числом параметров:
А так как процедуры сишные - не забываем, в случае использования констант в параметрах, указывать модификатор типа для односимвольных строк:
То же самое:
То есть там, где требуется "настоящий" адрес данных - в объявлении константы указываем модификатор типа.
Или просто используем переменную.
...
Однако, несмотря на, казалось бы, все предпринятые меры, вроде этой:
- программа по Ctrl+C не прерывается. Вот почему: в CZMQ можно назначить обработчик событий SIGINT ("Ctrl+C") и SIGTERM. Если он назначен, то прерывания будут обрабатываться в нем, а обращение к сокетам не будет возвращать ошибку или nil.
При инициализации CZMQ назначен стандартный обработчик. Он очень простой:
Стандартный обработчик устанавливает глобальные флаги zctx_interrupted и
zsys_interrupted в 1. Эти значения этих флагов доступны для чтения (а если очень будет нужно - то и для записи) и в Delphi:
Таким образом, можно не проверять состояние сокетов, а ориентироваться на флаги.
Но раз уж мы пишем приложение, проверяющее статус завершения каждой операции, просто отключим стандартный обработчик, и программа станет реагировать на Ctrl+C. Вот так:
Добавим эту строчку срузу после строк
Все заработало: при нажатии Ctrl+C приложение закрывается.
~~~~~~~~~~~~~~~~~~~~~
Теперь тот же самый брокер с балансировкой нагрузки, но с использованием реактора CZMQ:
В данном примере мы используем РЕАКТОР CZMQ.
Вот что он позволяет.
Естественно, цикл опроса внутри использует zmq_poll(). Всегда, когда происходит добавление или удаление ридеров, выполняется перестраивание структуры данных для опроса и пересчитываются таймауты, чтобы найти следующий таймер.
После этого вызывается ридер и обработчики таймера для каждого сокета и таймера, которые требуют обслуживания.
Когда мы используем шаблон "Реактор", код как бы выворачивается наизнанку. Код внешне выглядит примерно так:
Таким образом, обработка сообщений ZMQ размещена в коде специальных методов - обработчиков. При этом один и тот же обработчик может обрабатывать как активность сокета (поступление данных), так и активность таймеров сокета.
Для реализации конкретно нашей логики реактора создается специальная структура:
См реактор - структуру типа zloop_t. Реактор создается
reactor := zloop_new();
Затем в реакторе регистрируется массив поллинга, функция - обработчик и дополнительные праметры - self:
zloop_poller(reactor, @poller, @s_handle_backend, self);
После этого реактор запускается.:
zloop_start(reactor);
Почти вся логика реализуется в функциях - обработчиках.
Дальше я расскажу о реакторах CZMQ подробнее: (продолжение).
Дальше рассмотрим кодирование брокера с балансировкой нагрузки с использованием API высокого уровня - CZMQ.
Код простого брокера с балансировкой нагрузки:
program hl_LoadBalancingBrokerSimple; {$APPTYPE CONSOLE} // Брокер с балансировкой нагрузки. // Демонстрируется использование CZMQ //====================================================== // Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт). // Каждый рабочий при запуске сообщает брокеру о своей готовности (c_WORKER_READY). // Рабочий ставится в очередь. // // Клиент при запуске обращается к брокеру с заданием. Брокер отправляет // задание первому свободному рабочему. Рабочий, выполнив задание, возвращает // результат брокеру, брокер пересылает результат клиенту - заказчику. uses SysUtils, zmq_h, czmq_h, ZMQ_Utils; const c_NBR_CLIENTS = 5; // Число клиентов c_NBR_WORKERS = 2; // // Число рабочих // Конечные точки подключения c_url_clients = 'tcp://%s:5555'; //'inproc://clients'; c_url_workers = 'tcp://%s:5556'; //'inproc://workers'; c_domain = 'localhost'; // Сетевой адрес брокера c_interf: string = '*'; // Адрес для биндинга c_WORKER_READY: byte = 1; // Сигнал готовности рабочего function client_task(args: Pointer): Pointer; cdecl; var // Функция нити клиента client: Pointer; ctx: p_zctx_t; reply: PChar; begin ctx := args; // zctx_new(); client := zsocket_new(ctx, ZMQ_REQ); zsocket_connect(client, c_url_clients, c_domain); // Запрос - ответ while true do begin zstr_send(client, 'HELLO'); reply := zstr_recv(client); if (reply = nil) then break; z_Log('Client: ' + reply); zstr_free(reply); sleep(1); end; // zctx_destroy(ctx); Result := nil; end; function worker_task(args: Pointer): Pointer; cdecl; var // Функция нити рабочего ctx: p_zctx_t; frame: p_zframe_t; msg: p_zmsg_t; worker: Pointer; begin ctx := args; // zctx_new(); worker := zsocket_new(ctx, ZMQ_REQ); zsocket_connect(worker, c_url_workers, c_domain); // Сообщаем брокеру о готовности работать frame := zframe_new(@c_WORKER_READY, 1); zframe_send(frame, worker, 0); // ОБработка сообщений по мере их получения while True do begin msg := zmsg_recv(worker); if msg = nil then break; // Interrupted zframe_reset(zmsg_last(msg), PChar('OK'), 2); zmsg_send(msg, worker); end; // zctx_destroy(ctx); Result := nil; end; // Основная нить, порождающий дочерние // Если нажать Ctrl-C, работа в нитях завершится, и главная нить тоже завершится. procedure DoMain; var backend: Pointer; ctx: p_zctx_t; frame: p_zframe_t; frontend: Pointer; i: Integer; identity: p_zframe_t; items: array[0..1] of zmq_pollitem_t; msg: p_zmsg_t; rc: Integer; workers: p_zlist_t; begin ctx := zctx_new(); // Контекст zsys_handler_set(nil); frontend := zsocket_new(ctx, ZMQ_ROUTER); backend := zsocket_new(ctx, ZMQ_ROUTER); zsocket_bind(frontend, c_url_clients, c_interf); // Привязка к интерфейсу zsocket_bind(backend, c_url_workers, c_interf); for i := 0 to pred(c_NBR_CLIENTS) do // Запуск нитей клиентов zthread_new(@client_task, ctx); for i := 0 to Pred(c_NBR_WORKERS) do // Запуск нитей рабочих zthread_new(@worker_task, ctx); // Очередь доступных рабочих workers := zlist_new(); // Главный цикл балансировщика нагрузок. // Длинне, чем с реактором, но короче, чем на "чистом" ZMQ while True do begin items[0].socket := backend; items[0].fd := 0; items[0].events := ZMQ_POLLIN; items[0].revents := 0; items[1].socket := frontend; items[1].fd := 0; items[1].events := ZMQ_POLLIN; items[1].revents := 0; // Опрашиваем клиентов только если есть незанятые рабочие if zlist_size(workers) > 0 then rc := zmq_poll(@items[0], 2, -1) else rc := zmq_poll(@items[0], 1, -1); if rc = -1 then break; // прерывание // Обработка данных рабочих (от backend) if (items[0].revents and ZMQ_POLLIN) <> 0 then begin // Используем идентификацию msg := zmsg_recv(backend); if msg = nil then break; // Interrupted identity := zmsg_unwrap(msg); zlist_append(workers, identity); // Если это не сообщение о готовности, переслать сообщение клиенту frame := zmsg_first(msg); if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then zmsg_destroy(msg) else zmsg_send(msg, frontend); end; if (items[1].revents and ZMQ_POLLIN) <> 0 then begin // Получение запроса от клиента, передача первому незанятому рабочему msg := zmsg_recv(frontend); if msg <> nil then begin zmsg_wrap(msg, zlist_pop(workers)); zmsg_send(msg, backend); end; end; end; // Аккуратно завершаем все при выходе while zlist_size(workers) > 0 do begin frame := zlist_pop(workers); zframe_destroy(frame); end; zlist_destroy(workers); zctx_destroy(ctx); end; begin
//При создании многонитевого Delphi приложения обязательно задавать
// значение True для переменной System.IsMultyThread.
IsMultyThread := True;
DoMain;
Readln;
end.
При создании многонитевого Delphi приложения обязательно задавать значение True для переменной System.IsMultyThread. Но этого делать не нужно, если прямо или косвенно вызывается процедура BeginThread, в которой также задается это значение.
Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами:
identity := zmsg_unwrap(msg); zlist_append(workers, identity);
Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами:
identity := zmsg_unwrap(msg); zlist_append(workers, identity);
Составные сообщения теперь принимаются целиком, а не частями:
msg := zmsg_recv(frontend);
"Чистый" ZeroMQ в случае прерывания (по Ctrl+C) вернет код завершения операции -1 и установит код ошибка EINTR.
Здесь же (в CZMQ) операция просто вернет nil:
while true do begin zstr_send(client, 'HELLO'); reply := zstr_recv(client); if (reply = nil) then break; z_Log('Client: ' + reply); zstr_free(reply); sleep(1); end;
Ну и забавные вещи вроде строк с форматированием ("Си"шный format()):
const // Конечные точки подключения c_url_clients = 'tcp://%s:5555'; //'inproc://clients'; c_url_workers = 'tcp://%s:5556'; //'inproc://workers'; c_domain = 'localhost'; // Сетевой адрес брокера c_interf: string = '*'; // Адрес для биндинга. ... begin ... zsocket_connect(client, c_url_clients, c_domain); ... zsocket_bind(frontend, c_url_clients, c_interf);
Отмечу, что zsocket_connect() / zsocket_bind() - процедуры с переменным числом параметров:
// Connect a socket to a formatted endpoint // Returns 0 if OK, -1 if the endpoint was invalid. function zsocket_connect(self: pointer; format: PChar): Integer; varargs; cdecl; external cZMQ_DllName;
А так как процедуры сишные - не забываем, в случае использования констант в параметрах, указывать модификатор типа для односимвольных строк:
const c_interf: string = '*'; // Адрес для биндинга.
То же самое:
const c_WORKER_READY: byte = 1; ... begin ... frame := zframe_new(@c_WORKER_READY, 1); ... if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then ...
То есть там, где требуется "настоящий" адрес данных - в объявлении константы указываем модификатор типа.
Или просто используем переменную.
...
Однако, несмотря на, казалось бы, все предпринятые меры, вроде этой:
if msg = nil then break; // Interrupted
- программа по Ctrl+C не прерывается. Вот почему: в CZMQ можно назначить обработчик событий SIGINT ("Ctrl+C") и SIGTERM. Если он назначен, то прерывания будут обрабатываться в нем, а обращение к сокетам не будет возвращать ошибку или nil.
При инициализации CZMQ назначен стандартный обработчик. Он очень простой:
// Default internal signal handler static void s_signal_handler (int signal_value) { zctx_interrupted = 1; zsys_interrupted = 1; }
Стандартный обработчик устанавливает глобальные флаги zctx_interrupted и
zsys_interrupted в 1. Эти значения этих флагов доступны для чтения (а если очень будет нужно - то и для записи) и в Delphi:
function zsys_interrupted(): Integer; function zctx_interrupted(): Integer;
Таким образом, можно не проверять состояние сокетов, а ориентироваться на флаги.
Но раз уж мы пишем приложение, проверяющее статус завершения каждой операции, просто отключим стандартный обработчик, и программа станет реагировать на Ctrl+C. Вот так:
zsys_handler_set(nil);
Добавим эту строчку срузу после строк
begin ctx := zctx_new(); // Контекст
Все заработало: при нажатии Ctrl+C приложение закрывается.
~~~~~~~~~~~~~~~~~~~~~
Теперь тот же самый брокер с балансировкой нагрузки, но с использованием реактора CZMQ:
program hl_LoadBalancingBroker; {$APPTYPE CONSOLE} // Брокер с балансировкой нагрузки. // Демонстрируется использование CZMQ и реактора //============================================== // Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт). // Каждый рабочий при запуске сообщает брокеру о своей готовности (c_WORKER_READY). // Рабочий ставится в очередь. // // Клиент при запуске обращается к брокеру с заданием. Брокер отправляет // задание первому свободному рабочему. Рабочий, выполнив задание, возвращает // результат брокеру, брокер пересылает результат клиенту - заказчику. uses SysUtils , zmq_h , czmq_h , ZMQ_Utils ; const c_NBR_CLIENTS = 1; // Число клиентов c_NBR_WORKERS = 1; // // Число рабочих // Конечные точки подключения c_url_clients = 'tcp://%s:5555'; //'inproc://clients'; c_url_workers = 'tcp://%s:5556'; //'inproc://workers'; c_domain = 'localhost'; // Сетевой адрес брокера c_interf: string = '*'; // Адрес для биндинга c_WORKER_READY: byte = 1; // Сигнал готовности рабочего function client_task(args: Pointer): Pointer; cdecl; // Функция нити клиента var client: Pointer; ctx: p_zctx_t; reply: PChar; begin ctx := args; // zctx_new(); client := zsocket_new(ctx, ZMQ_REQ); zsocket_connect(client, c_url_clients, c_domain); // Запрос - ответ while true do begin zstr_send(client, 'HELLO'); reply := zstr_recv(client); if (reply = nil) then break; z_Log('Client: ' + reply); zstr_free(reply); sleep(1); end; // zctx_destroy(ctx); Result := nil; end; function worker_task(args: Pointer): Pointer; cdecl; // Функция нити рабочего var ctx: p_zctx_t; frame: p_zframe_t; msg: p_zmsg_t; worker: Pointer; begin ctx := args; // zctx_new(); worker := zsocket_new(ctx, ZMQ_REQ); zsocket_connect(worker, c_url_workers, c_domain); // Сообщаем брокеру о готовности работать frame := zframe_new(@c_WORKER_READY, 1); zframe_send(frame, worker, 0); // ОБработка сообщений по мере их получения while True do begin msg := zmsg_recv(worker); if msg = nil then break; // Interrupted zframe_reset(zmsg_last(msg), PChar('OK'), 2); zmsg_send(msg, worker); end; // zctx_destroy(ctx); Result := nil; end; // Структура, передаваемая в реактор type p_lbbroker_t = ^lbbroker_t; lbbroker_t = packed record frontend: Pointer; // Сокет - слушать клиентов backend: Pointer; // Сокет - слушать рабочих workers: p_zlist_t; // Список свободных рабочих end; // Устройство реактора таково, что все сообщения, приходящие в сокет, // передаются ректором в функцию обработки. У нас - два обработчика: // для фронтэнда (клиенты) и для бэкэнда (рабочие) // Обработка ввода от клиентjd (на фронтэнд) function s_handle_frontend(loop: p_zloop_t; poller: p_zmq_pollitem_t; arg: Pointer): Integer; cdecl; var msg: p_zmsg_t; self: p_lbbroker_t; begin self := p_lbbroker_t(arg); msg := zmsg_recv(self.frontend); // Сообщение от клиента if msg <> nil then begin zmsg_wrap(msg, p_zframe_t(zlist_pop(self.workers))); zmsg_send(msg, self.backend); // Завершение обработчика, если нет доступных рабочих if zlist_size(self.workers) = 0 then begin poller.socket := self.frontend; poller.fd := 0; poller.events := ZMQ_POLLIN; poller.revents := 0; zloop_poller_end(loop, poller); end; end; Result := 0; end; // Обработка ввода от рабочих (на бэкэнд) function s_handle_backend(loop: p_zloop_t; poller: p_zmq_pollitem_t; arg: Pointer): Integer; cdecl; var frame: p_zframe_t; identity: p_zframe_t; msg: p_zmsg_t; self: p_lbbroker_t; begin // Для балансировки нагрузки снова используем идентификацию self := p_lbbroker_t(arg); msg := zmsg_recv(self.backend); if msg <> nil then begin identity := zmsg_unwrap(msg); zlist_append(self.workers, identity); if zlist_size(self.workers) = 1 then begin // Разрешить ридер фронтэнда poller.socket := self.frontend; poller.fd := 0; poller.events := ZMQ_POLLIN; poller.revents := 0; zloop_poller(loop, poller, @s_handle_frontend, self); end; // Переброска сообщения клиенту, если это не "ГОТОВ". frame := zmsg_first(msg); if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then zmsg_destroy(msg) else zmsg_send(msg, self.frontend); end; result := 0; end; // Основная нить, порождающий дочерние, запускающий затем реактор. // Если нажать Ctrl-C, реактор завершится и главная нить тоже завершится. procedure DoMain; var ctx: p_zctx_t; frame: p_zframe_t; i: Integer; poller: zmq_pollitem_t; reactor: p_zloop_t; self: p_lbbroker_t; begin ctx := zctx_new(); // Контекст New(self); // Данные реактора self.frontend := zsocket_new(ctx, ZMQ_ROUTER); // Сокеты реактора self.backend := zsocket_new(ctx, ZMQ_ROUTER); zsocket_bind(self.frontend, c_url_clients, c_interf); // Привязка к интерфейсу zsocket_bind(self.backend, c_url_workers, c_interf); for i := 0 to pred(c_NBR_CLIENTS) do // Запуск нитей клиентов zthread_new(@client_task, ctx); for i := 0 to Pred(c_NBR_WORKERS) do // Запуск нитей рабочих zthread_new(@worker_task, ctx); // Очередь доступных рабочих self.workers := zlist_new(); // Подготовка и запуск реактора reactor := zloop_new(); poller.socket := self.backend; poller.fd := 0; poller.events := ZMQ_POLLIN; poller.revents := 0; zloop_poller(reactor, @poller, @s_handle_backend, self); zloop_start(reactor); zloop_destroy(reactor); // Аккуратно завершаем все при выходе while zlist_size(self.workers) > 0 do begin frame := zlist_pop(self.workers); zframe_destroy(frame); end; zlist_destroy(self.workers); zctx_destroy(ctx); Dispose(self); end; begin
//При создании многонитевого Delphi приложения обязательно задавать
// значение True для переменной System.IsMultyThread. IsMultyThread := True;
DoMain;
Readln;
end.
В данном примере мы используем РЕАКТОР CZMQ.
Вот что он позволяет.
- Любому сокету может быть назначить ридер: код, который вызывается всякий раз, когда в сокете появляются данные, готовые для чтения.
- Отключить ридер для сокета.
- Назначить таймер, который будет запускаться один или несколько раз через указанные промежутки времени.
- Отключить таймер.
Естественно, цикл опроса внутри использует zmq_poll(). Всегда, когда происходит добавление или удаление ридеров, выполняется перестраивание структуры данных для опроса и пересчитываются таймауты, чтобы найти следующий таймер.
После этого вызывается ридер и обработчики таймера для каждого сокета и таймера, которые требуют обслуживания.
Когда мы используем шаблон "Реактор", код как бы выворачивается наизнанку. Код внешне выглядит примерно так:
var reactor := p_zloop_t; ... begin ... reactor := zloop_new (); zloop_reader(reactor, self.backend, @s_handle_backend, self); zloop_start (reactor); zloop_destroy (reactor);
Таким образом, обработка сообщений ZMQ размещена в коде специальных методов - обработчиков. При этом один и тот же обработчик может обрабатывать как активность сокета (поступление данных), так и активность таймеров сокета.
Для реализации конкретно нашей логики реактора создается специальная структура:
// Структура, передаваемая в реактор type p_lbbroker_t = ^lbbroker_t; lbbroker_t = packed record frontend: Pointer; // Сокет - слушать клиентов backend: Pointer; // Сокет - слушать рабочих workers: p_zlist_t; // Список свободных рабочих end;
См реактор - структуру типа zloop_t. Реактор создается
reactor := zloop_new();
Затем в реакторе регистрируется массив поллинга, функция - обработчик и дополнительные праметры - self:
zloop_poller(reactor, @poller, @s_handle_backend, self);
После этого реактор запускается.:
zloop_start(reactor);
Почти вся логика реализуется в функциях - обработчиках.
Дальше я расскажу о реакторах CZMQ подробнее: (продолжение).
Комментариев нет :
Отправить комментарий