понедельник, 3 ноября 2014 г.

17. ZeroMQ: брокер с балансировкой нагрузки + CZMQ. Обработка Ctrl+C в CZMQ. Реакторы.

(Начало - здесь).

Дальше рассмотрим кодирование брокера с балансировкой нагрузки с использованием API высокого уровня - CZMQ.

Код простого брокера с балансировкой нагрузки:

program hl_LoadBalancingBrokerSimple;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
// Демонстрируется использование CZMQ
//======================================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности (c_WORKER_READY).
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает результат клиенту - заказчику.


uses
  SysUtils,
  zmq_h,
  czmq_h,
  ZMQ_Utils;

const
  c_NBR_CLIENTS = 5; // Число клиентов
  c_NBR_WORKERS = 2; // // Число рабочих

  // Конечные точки подключения
  c_url_clients = 'tcp://%s:5555'; //'inproc://clients';
  c_url_workers = 'tcp://%s:5556'; //'inproc://workers';
  c_domain = 'localhost'; // Сетевой адрес брокера
  c_interf: string = '*'; // Адрес для биндинга

  c_WORKER_READY: byte = 1;    // Сигнал готовности рабочего


function client_task(args: Pointer): Pointer; cdecl;
var // Функция нити клиента
  client: Pointer;
  ctx: p_zctx_t;
  reply: PChar;
begin
  ctx := args; // zctx_new();
  client := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(client, c_url_clients, c_domain);
// Запрос - ответ
  while true do begin
    zstr_send(client, 'HELLO');
    reply := zstr_recv(client);
    if (reply = nil) then
      break;
    z_Log('Client: ' + reply);
    zstr_free(reply);
    sleep(1);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;

function worker_task(args: Pointer): Pointer; cdecl;
var // Функция нити рабочего
  ctx: p_zctx_t;
  frame: p_zframe_t;
  msg: p_zmsg_t;
  worker: Pointer;
begin
  ctx := args; // zctx_new();
  worker := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(worker, c_url_workers, c_domain);

// Сообщаем брокеру о готовности работать
  frame := zframe_new(@c_WORKER_READY, 1);
  zframe_send(frame, worker, 0);

// ОБработка сообщений по мере их получения
  while True do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Interrupted
    zframe_reset(zmsg_last(msg), PChar('OK'), 2);
    zmsg_send(msg, worker);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;



// Основная нить, порождающий дочерние
// Если нажать Ctrl-C, работа в нитях завершится, и главная нить тоже завершится.

procedure DoMain;
var
  backend: Pointer;
  ctx: p_zctx_t;
  frame: p_zframe_t;
  frontend: Pointer;
  i: Integer;
  identity: p_zframe_t;
  items: array[0..1] of zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  workers: p_zlist_t;
begin
  ctx := zctx_new(); // Контекст
  zsys_handler_set(nil);

  frontend := zsocket_new(ctx, ZMQ_ROUTER);
  backend := zsocket_new(ctx, ZMQ_ROUTER);

  zsocket_bind(frontend, c_url_clients, c_interf);
    // Привязка к интерфейсу
  zsocket_bind(backend, c_url_workers, c_interf);

  for i := 0 to pred(c_NBR_CLIENTS) do // Запуск нитей клиентов
    zthread_new(@client_task, ctx);
  for i := 0 to Pred(c_NBR_WORKERS) do // Запуск нитей рабочих
    zthread_new(@worker_task, ctx);

// Очередь доступных рабочих
  workers := zlist_new();

 // Главный цикл балансировщика нагрузок.
 // Длинне, чем с реактором, но короче, чем на "чистом" ZMQ
  while True do begin
    items[0].socket := backend;
    items[0].fd := 0;
    items[0].events := ZMQ_POLLIN;
    items[0].revents := 0;
    items[1].socket := frontend;
    items[1].fd := 0;
    items[1].events := ZMQ_POLLIN;
    items[1].revents := 0;

// Опрашиваем клиентов только если есть незанятые рабочие
    if zlist_size(workers) > 0 then
      rc := zmq_poll(@items[0], 2, -1)
    else
      rc := zmq_poll(@items[0], 1, -1);
    if rc = -1 then
      break; // прерывание

// Обработка данных рабочих (от backend)
    if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
  // Используем идентификацию
      msg := zmsg_recv(backend);
      if msg = nil then
        break; // Interrupted
      identity := zmsg_unwrap(msg);
      zlist_append(workers, identity);

// Если это не сообщение о готовности, переслать сообщение клиенту
      frame := zmsg_first(msg);
      if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
        zmsg_destroy(msg)
      else
        zmsg_send(msg, frontend);
    end;
    if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получение запроса от клиента, передача первому незанятому рабочему
      msg := zmsg_recv(frontend);
      if msg <> nil then begin
        zmsg_wrap(msg, zlist_pop(workers));
        zmsg_send(msg, backend);
      end;
    end;
  end;
// Аккуратно завершаем все при выходе
  while zlist_size(workers) > 0 do begin
    frame := zlist_pop(workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;

begin 
//При создании многонитевого Delphi приложения обязательно задавать
// значение True для переменной System.IsMultyThread. 
  IsMultyThread := True;
  DoMain;
  Readln;
end.

При создании многонитевого Delphi приложения обязательно задавать значение True для переменной System.IsMultyThread. Но этого делать не нужно, если прямо или косвенно вызывается процедура BeginThread, в которой также задается это значение.


Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами:



     identity := zmsg_unwrap(msg);
     zlist_append(workers, identity);



Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами: 


 identity := zmsg_unwrap(msg);
 zlist_append(workers, identity);

Составные сообщения теперь принимаются целиком, а не частями:
 msg := zmsg_recv(frontend);



"Чистый" ZeroMQ в случае прерывания (по Ctrl+C) вернет код завершения операции -1 и установит код ошибка EINTR.
Здесь же (в CZMQ) операция просто вернет nil:

  while true do begin
    zstr_send(client, 'HELLO');
    reply := zstr_recv(client);
    if (reply = nil) then
      break;
    z_Log('Client: ' + reply);
    zstr_free(reply);
    sleep(1);
  end;

Ну и забавные вещи вроде строк с форматированием ("Си"шный format()):

const
  // Конечные точки подключения
  c_url_clients = 'tcp://%s:5555'; //'inproc://clients';
  c_url_workers = 'tcp://%s:5556'; //'inproc://workers';
  c_domain = 'localhost'; // Сетевой адрес брокера
  c_interf: string = '*'; // Адрес для биндинга.
...
begin
...
  zsocket_connect(client, c_url_clients, c_domain);
...
  zsocket_bind(frontend, c_url_clients, c_interf);


 Отмечу, что zsocket_connect() / zsocket_bind() - процедуры с переменным числом параметров:

//  Connect a socket to a formatted endpoint
//  Returns 0 if OK, -1 if the endpoint was invalid.
function zsocket_connect(self: pointer; format: PChar): Integer; varargs; cdecl; external   cZMQ_DllName;

А так как процедуры сишные - не забываем, в случае использования констант в параметрах, указывать модификатор типа для односимвольных строк:

const
  c_interf: string = '*'; // Адрес для биндинга.

То же самое:

const
  c_WORKER_READY: byte = 1;
...

begin
...
    frame := zframe_new(@c_WORKER_READY, 1);
...
    if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
...


То есть там, где требуется "настоящий" адрес данных - в объявлении константы указываем модификатор типа.
Или просто используем переменную.
...



Однако, несмотря на, казалось бы, все предпринятые меры, вроде этой:
     if msg = nil then
        break; // Interrupted

 - программа по Ctrl+C не прерывается. Вот почему: в CZMQ можно назначить обработчик событий SIGINT ("Ctrl+C") и SIGTERM. Если он назначен, то прерывания будут обрабатываться в нем, а обращение к сокетам не будет возвращать ошибку или nil.
При инициализации CZMQ назначен стандартный обработчик. Он очень простой:

//  Default internal signal handler
static void
s_signal_handler (int signal_value)
{
    zctx_interrupted = 1;
    zsys_interrupted = 1;
}



Стандартный обработчик устанавливает глобальные флаги zctx_interrupted и
zsys_interrupted в 1. Эти значения этих флагов доступны для чтения (а если очень будет нужно - то и для записи) и в Delphi:

function zsys_interrupted(): Integer;
function zctx_interrupted(): Integer;


 Таким образом, можно не проверять состояние сокетов, а ориентироваться на флаги.
Но раз уж мы пишем приложение, проверяющее статус завершения каждой операции, просто отключим стандартный обработчик, и программа станет реагировать на Ctrl+C. Вот так:




 zsys_handler_set(nil);

Добавим эту строчку срузу после строк

begin
  ctx := zctx_new(); // Контекст


Все заработало: при нажатии Ctrl+C приложение закрывается.

~~~~~~~~~~~~~~~~~~~~~

Теперь тот  же самый брокер с балансировкой нагрузки, но с использованием реактора CZMQ:

program hl_LoadBalancingBroker;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
// Демонстрируется использование CZMQ и реактора
//==============================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности (c_WORKER_READY).
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает результат клиенту - заказчику.

uses
  SysUtils
  , zmq_h
  , czmq_h
  , ZMQ_Utils
  ;

const
  c_NBR_CLIENTS = 1; // Число клиентов
  c_NBR_WORKERS = 1; // // Число рабочих

  // Конечные точки подключения
  c_url_clients = 'tcp://%s:5555'; //'inproc://clients';
  c_url_workers = 'tcp://%s:5556'; //'inproc://workers';
  c_domain = 'localhost'; // Сетевой адрес брокера
  c_interf: string = '*'; // Адрес для биндинга

  c_WORKER_READY: byte = 1; // Сигнал готовности рабочего


function client_task(args: Pointer): Pointer; cdecl;
// Функция нити клиента
var
  client: Pointer;
  ctx: p_zctx_t;
  reply: PChar;
begin
  ctx := args; // zctx_new();
  client := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(client, c_url_clients, c_domain);
// Запрос - ответ
  while true do begin
    zstr_send(client, 'HELLO');
    reply := zstr_recv(client);
    if (reply = nil) then
      break;
    z_Log('Client: ' + reply);
    zstr_free(reply);
    sleep(1);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;

function worker_task(args: Pointer): Pointer; cdecl;
// Функция нити рабочего
var
  ctx: p_zctx_t;
  frame: p_zframe_t;
  msg: p_zmsg_t;
  worker: Pointer;
begin
  ctx := args; // zctx_new();
  worker := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(worker, c_url_workers, c_domain);

// Сообщаем брокеру о готовности работать
  frame := zframe_new(@c_WORKER_READY, 1);
  zframe_send(frame, worker, 0);

// ОБработка сообщений по мере их получения
  while True do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Interrupted
    zframe_reset(zmsg_last(msg), PChar('OK'), 2);
    zmsg_send(msg, worker);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;


// Структура, передаваемая в реактор
type
  p_lbbroker_t = ^lbbroker_t;
  lbbroker_t = packed record
    frontend: Pointer; // Сокет - слушать клиентов
    backend: Pointer; // Сокет - слушать рабочих
    workers: p_zlist_t; // Список свободных рабочих
  end;

// Устройство реактора таково, что все сообщения, приходящие в сокет,
// передаются ректором в функцию обработки. У нас - два обработчика:
// для фронтэнда (клиенты) и для бэкэнда (рабочие)

// Обработка ввода от клиентjd (на фронтэнд)

function s_handle_frontend(loop: p_zloop_t; poller: p_zmq_pollitem_t; arg: Pointer):
  Integer; cdecl;
var
  msg: p_zmsg_t;
  self: p_lbbroker_t;
begin
  self := p_lbbroker_t(arg);
  msg := zmsg_recv(self.frontend); // Сообщение от клиента
  if msg <> nil then begin
    zmsg_wrap(msg, p_zframe_t(zlist_pop(self.workers)));
    zmsg_send(msg, self.backend);
// Завершение обработчика, если нет доступных рабочих
    if zlist_size(self.workers) = 0 then begin
      poller.socket := self.frontend;
      poller.fd := 0;
      poller.events := ZMQ_POLLIN;
      poller.revents := 0;
      zloop_poller_end(loop, poller);
    end;
  end;
  Result := 0;
end;

// Обработка ввода от рабочих (на бэкэнд)

function s_handle_backend(loop: p_zloop_t; poller: p_zmq_pollitem_t; arg: Pointer):
  Integer; cdecl;
var
  frame: p_zframe_t;
  identity: p_zframe_t;
  msg: p_zmsg_t;
  self: p_lbbroker_t;
begin
// Для балансировки нагрузки снова используем идентификацию
  self := p_lbbroker_t(arg);
  msg := zmsg_recv(self.backend);
  if msg <> nil then begin
    identity := zmsg_unwrap(msg);
    zlist_append(self.workers, identity);


    if zlist_size(self.workers) = 1 then begin
    // Разрешить ридер фронтэнда
      poller.socket := self.frontend;
      poller.fd := 0;
      poller.events := ZMQ_POLLIN;
      poller.revents := 0;
      zloop_poller(loop, poller, @s_handle_frontend, self);
    end;
// Переброска сообщения клиенту, если это не "ГОТОВ".
    frame := zmsg_first(msg);
    if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
      zmsg_destroy(msg)
    else
      zmsg_send(msg, self.frontend);
  end;
  result := 0;
end;

// Основная нить, порождающий дочерние, запускающий затем реактор.
// Если нажать Ctrl-C, реактор завершится и главная нить тоже завершится.

procedure DoMain;
var
  ctx: p_zctx_t;
  frame: p_zframe_t;
  i: Integer;
  poller: zmq_pollitem_t;
  reactor: p_zloop_t;
  self: p_lbbroker_t;
begin
  ctx := zctx_new(); // Контекст
  New(self); // Данные реактора

  self.frontend := zsocket_new(ctx, ZMQ_ROUTER); // Сокеты реактора
  self.backend := zsocket_new(ctx, ZMQ_ROUTER);

  zsocket_bind(self.frontend, c_url_clients, c_interf);
    // Привязка к интерфейсу
  zsocket_bind(self.backend, c_url_workers, c_interf);

  for i := 0 to pred(c_NBR_CLIENTS) do // Запуск нитей клиентов
    zthread_new(@client_task, ctx);
  for i := 0 to Pred(c_NBR_WORKERS) do // Запуск нитей рабочих
    zthread_new(@worker_task, ctx);

// Очередь доступных рабочих
  self.workers := zlist_new();

// Подготовка и запуск реактора
  reactor := zloop_new();

  poller.socket := self.backend;
  poller.fd := 0;
  poller.events := ZMQ_POLLIN;
  poller.revents := 0;

  zloop_poller(reactor, @poller, @s_handle_backend, self);
  zloop_start(reactor);
  zloop_destroy(reactor);

// Аккуратно завершаем все при выходе
  while zlist_size(self.workers) > 0 do begin
    frame := zlist_pop(self.workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(self.workers);
  zctx_destroy(ctx);
  Dispose(self);
end;

begin 
//При создании многонитевого Delphi приложения обязательно задавать 
// значение True для переменной System.IsMultyThread. 
  IsMultyThread := True;
  DoMain;
  Readln;
end.



В данном примере мы используем РЕАКТОР CZMQ.

Вот что он позволяет.

  • Любому сокету может быть назначить ридер: код, который вызывается всякий раз, когда в сокете появляются данные, готовые для чтения.
  • Отключить ридер для сокета.
  • Назначить таймер, который будет запускаться один или несколько раз через указанные промежутки времени.
  • Отключить таймер.

Естественно, цикл опроса внутри использует zmq_poll(). Всегда, когда происходит добавление или удаление ридеров, выполняется перестраивание структуры данных для опроса и пересчитываются таймауты, чтобы найти следующий таймер.
После этого вызывается ридер и обработчики таймера для каждого сокета и таймера, которые требуют обслуживания.

Когда мы используем шаблон "Реактор", код как бы выворачивается наизнанку. Код внешне выглядит примерно так:

var
  reactor :=  p_zloop_t;
...
begin
...
  reactor := zloop_new ();
  zloop_reader(reactor, self.backend, @s_handle_backend, self);
  zloop_start (reactor);
  zloop_destroy (reactor);


Таким образом, обработка сообщений ZMQ размещена в коде специальных методов - обработчиков. При этом один и тот же обработчик может обрабатывать как активность сокета (поступление данных), так и активность таймеров сокета.
Для реализации конкретно нашей логики реактора создается специальная структура:


// Структура, передаваемая в реактор
type
  p_lbbroker_t = ^lbbroker_t;
  lbbroker_t = packed record
    frontend: Pointer; // Сокет - слушать клиентов
    backend: Pointer; // Сокет - слушать рабочих
    workers: p_zlist_t; // Список свободных рабочих
  end;

См реактор - структуру типа zloop_t. Реактор создается 

  reactor := zloop_new();


Затем в реакторе регистрируется массив поллинга, функция - обработчик и дополнительные праметры - self:

  zloop_poller(reactor, @poller, @s_handle_backend, self);
 

После этого реактор запускается.:


  zloop_start(reactor);

Почти вся логика реализуется в функциях - обработчиках.

Дальше я расскажу о реакторах CZMQ подробнее: (продолжение).



Комментариев нет :

Отправить комментарий