вторник, 4 ноября 2014 г.

18. ZeroMQ: реакторы CZMQ. ZLOOP.



(Начало - здесь).

Шаблон "Реактор".

<Теоория.>
Шаблон проектирования Реактор является событийно-управляемым шаблоном проектирования. Он предназначен для синхронной передачи запросов к сервису, поступающих параллельно от  одного или нескольких источников. Обработчик сервиса демультиплексирует  (разбирает) входящие запросы и  синхронно отправляет их ассоциированным обработчикам запросов.</Теория>

 Реактор напоминает процедуру окна Windows: разбирает поступившие сообщения и вызывает коллбеки, связанные с сообщениями.


Класс zloop - событийно-управляемый реактор.

 

Реактор обрабатывает элементы zmq_pollitem_t (поллеры или райтеры, сокеты) а также  однократные или циклические таймеры. Разрешение реактора составляет 1 миллисекунда. Для уменьшения нагрузки на CPU во время низкой нагрузки реактор использует безтиковые таймеры.

Создание
    Регистрация источников данных (таймеров, сокетов, поллеров) .
    Запуск
        Работа
    Остановка

Удаление 
После создания реактора в нем регистрируются сервисы и их обработчики: таймеры, сокеты, поллеры. Потом реактор запускается и начинается цикл обработки событий: срабатывание таймеров, поступление данных из сокетов или поллеров.

Пример работы реактора.
1. Создается пара сокетов: input, output.
2. В реактор загружаются:
  • таймер 1, который должен сработать через 1000 мс.
  • таймер 2, который должен сработать через 5 мс. Обработчик этого таймера уничтожает таймер 1.
  • таймер 3, который должен сработать через 20 мс. Обработчик этого таймера отправит в сокет output сообщение "ping". Из сокета output сообщение отправится в сокет input.
  • ридер сокета input, который, прочитав полученное сообщение "ping", вызовет свой обработчик, а последний просто вернет значение "-1": сигнал о том, что реактор пора остановить.
3 Реактор останавливается.

Код основной процедуры реактора:

procedure delphi_zloop_test(verbose: BOOL);
var
  input: p_zsock_t;
  loop: p_zloop_t;
  output: p_zsock_t;
  rc: Integer;
  timer_id: Integer;
begin
// Тест Реактора CZMQ (zloop).
  z_Log(' * zloop: ');
  //  Создается два PAIR сокета и соединяются по inproc
  output := zsock_new(ZMQ_PAIR);
  assert(Assigned(output));
  zsock_bind(output, 'inproc://zloop.test');
  input := zsock_new(ZMQ_PAIR);
  assert(Assigned(input));
  zsock_connect(input, 'inproc://zloop.test');

  //Создается реактор:
  loop := zloop_new();
  assert(Assigned(loop));
  zloop_set_verbose(loop, verbose);

  //  Создаем 1K mсек таймер,
  timer_id := zloop_timer(loop, 1000, 1, s_timer_event, nil);
  // ...который будет отключим 5ти mсек таймером
  zloop_timer(loop, 5, 1, s_cancel_timer_event, @timer_id);

  //  А этот таймер через 20 мсек пошлет сообщение "ping" в output
  zloop_timer(loop, 20, 1, s_timer_event, output);

  //  Ридер сокета input: при получении сообщение "ping", завершает реактор
  rc := zloop_reader(loop, input, s_socket_event, nil);
  Assert(rc = 0);
  zloop_reader_set_tolerant(loop, input);
  zloop_start(loop);

  zloop_destroy(loop);
  assert(loop = nil);

  zsock_destroy(input);
  zsock_destroy(output);
  z_Log('OK');
end;

Код обработчиков:

function s_cancel_timer_event(loop: p_zloop_t; timer_id: integer;
  arg: Pointer): integer; cdecl;
var
  cancel_timer_id: Integer;
begin
    //  Обработчик второго таймера, останавливаем таймер 1
  cancel_timer_id := pInteger(arg)^;
  Result := zloop_timer_end(loop, cancel_timer_id);
end;

function s_timer_event(loop: p_zloop_t; timer_id: integer;
  output: Pointer): integer; cdecl;
begin
  zstr_send(output, 'PING');
  Result := 0;
end;

function s_socket_event(loop: p_zloop_t; handle: p_zsock_t;
  arg: Pointer): integer; cdecl;
begin
  Result := -1; //  Просто останавливаем реактор
end;

Не забываем, что обработчики должны быть с модификатором cdecl;

Функции с суффиксом "_set_tolerant" указывают реактору, что ошибки данного источника следует игнорировать. Иначе в случае ошибки реактор просто перестанет обрабатывать данный объект:


  zloop_reader_set_tolerant(loop, input);

Вроде все просто.

...
Объекты, которые можно загрузить в "реактор" (со своими обработчиками):

1. Сокеты. Для регистрации нового сокета и его обработчика используется функция






function zloop_reader(self: p_zloop_t; sock: p_zsock_t;
  handler: zloop_reader_fn; arg: Pointer): integer; cdecl; external cZMQ_DllName;

Если сокет регистрируется повторно, каждый зарегистрированный экземпляр получит свой собственный обработчик.

2. Элементы поллера zmq_pollitem_t:




function zloop_poller(self: p_zloop_t; item: p_zmq_pollitem_t;
  handler: zloop_fn; arg: Pointer): Integer; cdecl; external cZMQ_DllName;

3. Таймеры:

function zloop_timer(self: p_zloop_t; delay: size_t; times: size_t;
  handler: zloop_timer_fn; arg: Pointer): integer; cdecl; external cZMQ_DllName; 

 При этом указывается время задержки и количество запусков таймера. Если количество запусков - 0, таймер будет работать бесконечно. Функция возвращает идентификатор таймера (1, 2, 3...).

Ура, с реактором разобрались!

Будем создавать архитектуру сети N-1, когда несколько разных клиентов асинхронно общаются с одним сервером. (Продолжение).



Комментариев нет :

Отправить комментарий