(Начало - здесь).
Шаблон "Реактор".
<Теоория.>Шаблон проектирования Реактор является событийно-управляемым шаблоном проектирования. Он предназначен для синхронной передачи запросов к сервису, поступающих параллельно от одного или нескольких источников. Обработчик сервиса демультиплексирует (разбирает) входящие запросы и синхронно отправляет их ассоциированным обработчикам запросов.</Теория>
Реактор напоминает процедуру окна Windows: разбирает поступившие сообщения и вызывает коллбеки, связанные с сообщениями.
Класс zloop - событийно-управляемый реактор.
Реактор обрабатывает элементы zmq_pollitem_t (поллеры или райтеры, сокеты) а также однократные или циклические таймеры. Разрешение реактора составляет 1 миллисекунда. Для уменьшения нагрузки на CPU во время низкой нагрузки реактор использует безтиковые таймеры.
Создание
Регистрация источников данных (таймеров, сокетов, поллеров) .
Запуск
Работа
Остановка
УдалениеПосле создания реактора в нем регистрируются сервисы и их обработчики: таймеры, сокеты, поллеры. Потом реактор запускается и начинается цикл обработки событий: срабатывание таймеров, поступление данных из сокетов или поллеров.
Пример работы реактора.
1. Создается пара сокетов: input, output.
2. В реактор загружаются:
- таймер 1, который должен сработать через 1000 мс.
- таймер 2, который должен сработать через 5 мс. Обработчик этого таймера уничтожает таймер 1.
- таймер 3, который должен сработать через 20 мс. Обработчик этого таймера отправит в сокет output сообщение "ping". Из сокета output сообщение отправится в сокет input.
- ридер сокета input, который, прочитав полученное сообщение "ping", вызовет свой обработчик, а последний просто вернет значение "-1": сигнал о том, что реактор пора остановить.
Код основной процедуры реактора:
procedure delphi_zloop_test(verbose: BOOL); var input: p_zsock_t; loop: p_zloop_t; output: p_zsock_t; rc: Integer; timer_id: Integer; begin // Тест Реактора CZMQ (zloop). z_Log(' * zloop: '); // Создается два PAIR сокета и соединяются по inproc output := zsock_new(ZMQ_PAIR); assert(Assigned(output)); zsock_bind(output, 'inproc://zloop.test'); input := zsock_new(ZMQ_PAIR); assert(Assigned(input)); zsock_connect(input, 'inproc://zloop.test'); //Создается реактор: loop := zloop_new(); assert(Assigned(loop)); zloop_set_verbose(loop, verbose); // Создаем 1K mсек таймер, timer_id := zloop_timer(loop, 1000, 1, s_timer_event, nil); // ...который будет отключим 5ти mсек таймером zloop_timer(loop, 5, 1, s_cancel_timer_event, @timer_id); // А этот таймер через 20 мсек пошлет сообщение "ping" в output zloop_timer(loop, 20, 1, s_timer_event, output); // Ридер сокета input: при получении сообщение "ping", завершает реактор rc := zloop_reader(loop, input, s_socket_event, nil); Assert(rc = 0); zloop_reader_set_tolerant(loop, input); zloop_start(loop); zloop_destroy(loop); assert(loop = nil); zsock_destroy(input); zsock_destroy(output); z_Log('OK'); end;
Код обработчиков:
function s_cancel_timer_event(loop: p_zloop_t; timer_id: integer; arg: Pointer): integer; cdecl; var cancel_timer_id: Integer; begin // Обработчик второго таймера, останавливаем таймер 1 cancel_timer_id := pInteger(arg)^; Result := zloop_timer_end(loop, cancel_timer_id); end; function s_timer_event(loop: p_zloop_t; timer_id: integer; output: Pointer): integer; cdecl; begin zstr_send(output, 'PING'); Result := 0; end; function s_socket_event(loop: p_zloop_t; handle: p_zsock_t; arg: Pointer): integer; cdecl; begin Result := -1; // Просто останавливаем реактор end;
Не забываем, что обработчики должны быть с модификатором cdecl;
Функции с суффиксом "_set_tolerant" указывают реактору, что ошибки данного источника следует игнорировать. Иначе в случае ошибки реактор просто перестанет обрабатывать данный объект:
zloop_reader_set_tolerant(loop, input);
Вроде все просто.
...
Объекты, которые можно загрузить в "реактор" (со своими обработчиками):
1. Сокеты. Для регистрации нового сокета и его обработчика используется функция
function zloop_reader(self: p_zloop_t; sock: p_zsock_t; handler: zloop_reader_fn; arg: Pointer): integer; cdecl; external cZMQ_DllName;
Если сокет регистрируется повторно, каждый зарегистрированный экземпляр получит свой собственный обработчик.
2. Элементы поллера zmq_pollitem_t:
function zloop_poller(self: p_zloop_t; item: p_zmq_pollitem_t; handler: zloop_fn; arg: Pointer): Integer; cdecl; external cZMQ_DllName;
3. Таймеры:
function zloop_timer(self: p_zloop_t; delay: size_t; times: size_t; handler: zloop_timer_fn; arg: Pointer): integer; cdecl; external cZMQ_DllName;
При этом указывается время задержки и количество запусков таймера. Если количество запусков - 0, таймер будет работать бесконечно. Функция возвращает идентификатор таймера (1, 2, 3...).
Ура, с реактором разобрались!
Будем создавать архитектуру сети N-1, когда несколько разных клиентов асинхронно общаются с одним сервером. (Продолжение).
Комментариев нет :
Отправить комментарий