пятница, 9 января 2015 г.

22.6 ZeroMQ: надежные схемы "Запрос/Ответ". Сервис - ориентированная надежная очередь. Шаблон "Мажордом".

(Начало - здесь)

ZeroMQ: надежные схемы "Запрос/Ответ". Сервис - ориентированная надежная очередь. Шаблон "Мажордом".


Шаблон "Мажордом". Топология.


Прогресс ускоряется, когда в нем не участвуют юристы и разные комитеты. Одно-страничная спецификация MDP сразу  превращает PPP в нечто более солидное. Именно так и должна происходить разработка сложных архитектур: начинать следует с написания соглашений, а только потом писать софт, их реализующий.
Протокол "Мажордом" (The Majordomo Protocol - MDP) расширяет и углубляет PPP в одном интересном направлении: он добавляет "имя сервиса" к запросу, который отправляет клиент, и требует от рабочих регистрироваться для оказания конкретных услуг. Добавление имен сервиса превращает брокер из шаблона "Пират-параноик" в сервис - ориентированный брокер.



Для MDP хорошо то, что он сформировался на основе работающего  кода, наследуется от более простого протокола  (PPP), и представляет собой набор улучшений, каждое из которых решает отдельную проблему. Все это упрощает процесс разработки.
Для реализации шаблона "Мажордом" нужно написать фреймворк для клиентов и рабочих. Действительно, не очень умно добиваться от каждого разработчика приложений, чтобы он читал спецификации и добивался, чтобы они заработали, тогда они могли бы просто использовать более простой API, который за них делает работу.
И так, пока наш первое соглашение (сам MDP) определяет, как части нашей распределенной архитектуры общаются с другими частями, второе соглашение определяет, как приложения пользователя общается с технологическим фреймворком, который мы собираемся создать.
Шаблон "Мажордом" состоит из двух частей: клиент и рабочий. Так как мы собираемся писать и клиента, и рабочего, нам нужно два API.
Ниже приведен эскиз клиентского приложения, использующего API.
Примечание: используем псевдо-объектный стиль кодирования, просто потому, что в он же используется руководстве.

Код приложения "клиент":

***
 
program mdclient;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  mdcliapi in 'mdcliapi.pas',
  mdp in 'mdp.pas';

procedure DoIt;
var
  i: Integer;
  fSession: p_mdcli_t;
  fVerbose: Boolean;
  reply: p_zmsg_t;
  request: p_zmsg_t;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  fSession := mdcli_new('tcp://localhost:5555', fVerbose);

  i := 0;
  for i := 0 to 99999 do begin
    request := zmsg_new();
    zmsg_pushstr(request, 'Hello world');
    reply := mdcli_send(fSession, 'echo', request);
    if reply <> nil then
      zmsg_destroy(reply)
    else
      break; //  Прерван или отказ
  end;

  zclock_log('%d requests / replies processed'#10, i);
  mdcli_destroy(fSession);

end;
begin
  DoIt;
  Readln;
end.

Всё. :)

Мы открываем сессию связи с брокером, в цикле шлем запрос, получаем ответ, and в конце закрываем сессию.

А вот код приложения "рабочий":

 
program mdworker;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  mdwrkapi in 'mdwrkapi.pas',
  mdp in 'mdp.pas';

procedure DoIt;
var
  fVerbose: Boolean;
  reply: p_zmsg_t;
  request: p_zmsg_t;
  session: p_mdwrk_t;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  session := mdwrk_new('tcp://localhost:5555', 'echo', fVerbose);
  reply := nil;
  while (true) do begin
    request := mdwrk_recv(session, reply);
    if (request = nil) then
      break; //  Рабочий был прерван
    reply := request; //  Эхо ... :-)
  end;
  mdwrk_destroy(session);
  Exit;

end;
begin
  doIt;
  Readln;
end.

В принципе, то же самое, но с небольшими отличиями. Сначала рабочий выполняет recv(), который передает брокеру пустой ответ. Затем рабочий в цикле возвращает текущий ответ и получает новый запрос. API клиента и рабочего создать очень просто, так как они основаны на коде шаблона "Пират-Параноик", который был отлажен ранее.
Константы протокола Majordomo определены в модуле mdp.pas: :

 
unit mdp;

interface
//  Константы протокола Majordomo

//  Версия MDP/Client
const
  cMDPC_CLIENT = 'MDPC01';

//  Версия MDP/Worker
  cMDPW_WORKER = 'MDPW01';

//  Команды MDP/Server

  cMDPW_READY = #01;
  cMDPW_REQUEST = #02;
  cMDPW_REPLY = #03;
  cMDPW_HEARTBEAT = #04;
  cMDPW_DISCONNECT = #05;

  mdps_commands: array[0..5] of PChar =
  ('', 'READY', 'REQUEST', 'REPLY', 'HEARTBEAT', 'DISCONNECT');

implementation

end.

А вот так реализован сам модуль клиентского API: :

unit mdcliapi;

interface
uses
  mdp
  , zmq_h
  , czmq_h
  , ZMQ_Utils;


type

  p_mdcli_t = ^mdcli_t;
  mdcli_t = record
    ctx: p_zctx_t; //  Контекст
    broker: PChar;
    client: Pointer; //  Сокет для связи с брокером
    verbose: Boolean; //  Протоколирвоание в stdout
    timeout: integer; //  Таймаут запроса
    retries: integer; //  Число попыток

  end;

function mdcli_new(broker: PChar; verbose: Boolean): p_mdcli_t;
procedure mdcli_destroy(var self: p_mdcli_t);

function mdcli_send(self: p_mdcli_t; service: PChar; var request: p_zmsg_t):
  p_zmsg_t;

procedure s_mdcli_connect_to_broker(self: p_mdcli_t);

implementation

uses
  SysUtils;

function mdcli_new(broker: PChar; verbose: Boolean): p_mdcli_t;
var
  self: p_mdcli_t;
begin
  Assert(broker <> nil);
  self := safe_malloc(SizeOf(mdcli_t));
  self.ctx := zctx_new();

  self.broker := zStrdup(broker);
  self.verbose := verbose;
  self.timeout := 2500; //  Миллисекунд
  self.retries := 3; //  Попыток до отказа

  s_mdcli_connect_to_broker(self);
  Result := self;

end;

function mdcli_send(self: p_mdcli_t; service: PChar; var request: p_zmsg_t):
  p_zmsg_t;
var
  header: p_zframe_t;
  item: zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  reply_service: p_zframe_t;
  retries_left: Integer;
begin
  assert(self <> nil);
  assert(request <> nil);

  //  В соответствии с требованиями протокола, должны быть фреймы-префиксы
  //  Фрейм 1: "MDPCxy" (шесть байт, MDP/Client x.y)
  //  Фрейм 2: Имя сервиса (печатная строка)
  zmsg_pushstr(request, service);
  zmsg_pushstr(request, cMDPC_CLIENT);
  if self.verbose then begin
    zclock_log('I: send request to ''%s'' service:', service);
    zmsg_print(request);
  end;
  retries_left := self.retries;
  while (retries_left > 0) and (zctx_interrupted = 0) do begin
    msg := zmsg_dup(request);
    zmsg_send(msg, self.client);

    zPollItemInit(item, self.client, 0, ZMQ_POLLIN, 0);

    //  При любом блокирующем вызове (libzmq), в случае ошибки  будет -1;
    //  Теоретически, следует учесть разные коды завершения,
    //  но на практике достаточно учесть {EINTR} (Ctrl-C):

    rc := zmq_poll(@item, 1, self.timeout * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; //  Прерван

    //  Что-то приняли, обрабатываем
    if (item.revents and ZMQ_POLLIN) <> 0 then begin
      msg := zmsg_recv(self.client);
      if self.verbose then begin
        zclock_log('I: received reply:');
        zmsg_print(msg);
      end;

      //  В реальном коде обрабботка отказа должна быть более тщательной
       assert(zmsg_size(msg) >= 3);

      header := zmsg_pop(msg);
      assert(zframe_streq(header, cMDPC_CLIENT));
      zframe_destroy(header);

      reply_service := zmsg_pop(msg);
      assert(zframe_streq(reply_service, service));
      zframe_destroy(&reply_service);

      zmsg_destroy(request);
      Result := msg; //  Успех
      Exit;
    end
    else if z_Dec(retries_left) > 0 then begin
      if (self.verbose) then
        zclock_log('W: no reply, reconnecting...');
      s_mdcli_connect_to_broker(self);

    end
    else begin
      if (self.verbose) then
        zclock_log('W: permanent error, abandoning');
      break; //  Всё

    end;
  end;
  if zctx_interrupted <> 0 then
    zclock_log('W: interrupt received, killing client...'#10);
  zmsg_destroy(request);
  Result := nil;
end;

procedure mdcli_destroy(var self: p_mdcli_t);
begin
  assert(self <> nil);
  if self <> nil then begin
    zctx_destroy(self.ctx);
    safe_memfree(self.broker);
    safe_memfree(self);
    self := nil;
  end;
end;

procedure s_mdcli_connect_to_broker(self: p_mdcli_t);
begin
  if self.client <> nil then
    zsocket_destroy(self.ctx, self.client);
  self.client := zsocket_new(self.ctx, ZMQ_REQ);
  zmq_connect(self.client, self.broker);
  if self.verbose then
    zclock_log('I: connecting to broker at %s...', self.broker);
end;

end.


Код реализации API рабочего: :

 
unit mdwrkapi;

interface
uses
  zmq_h
  , czmq_h;
type
  p_mdwrk_t = ^mdwrk_t;
  mdwrk_t = record
    ctx: p_zctx_t; //  Контекст
    broker: PChar;
    service: PChar;
    worker: Pointer; //  Сокет для связи с брокером
    verbose: Boolean; //  Протоколирование в stdout

    //  Управление хартбитингом
    heartbeat_at: uint64_t; //  Когда слать хартбит
    liveness: size_t; //  Сколько осталось попыток
    heartbeat: integer; //  Задержка харбитинга, миллисекунд
    reconnect: Integer; //  Задержка реконнекта, миллисекунд

    expect_reply: Boolean; //  False только сначала, из-за calloc()
    reply_to: p_zframe_t; //  Идентификация для возврата, если есть

  end;
const
  //  Параметры надежности
  cHEARTBEAT_LIVENESS = 3; //  Обычно 3-5 попыток


function mdwrk_new(broker: PChar; service: PChar; verbose: Boolean):
  p_mdwrk_t;
procedure mdwrk_destroy(var self: p_mdwrk_t);
function mdwrk_recv(self: p_mdwrk_t; var reply: p_zmsg_t): p_zmsg_t;

implementation

uses
  ZMQ_Utils, mdp;

procedure mdwrk_destroy(var self: p_mdwrk_t);
begin
  assert(self <> nil);
  if self <> nil then begin
    zctx_destroy(self.ctx);
    zstr_free(self.broker);
    zstr_free(self.service);
    safe_memfree(self);
    self := nil;
  end;
end;


procedure s_mdwrk_send_to_broker(self: p_mdwrk_t; command: PChar;
  option: PChar; msg: p_zmsg_t);
//  Отправка сообщения брокеру
//  Если сообщения нет (пусто), создать его самостоятельно
begin
  if msg <> nil then
    msg := zmsg_dup(msg)
  else
    msg := zmsg_new();

  //  Формирование конверта сообщения в сответствии с протоколом
  if option <> nil then
    zmsg_pushstr(msg, option);
  zmsg_pushstr(msg, command);
  zmsg_pushstr(msg, cMDPW_WORKER);
  zmsg_pushstr(msg, '');

  if self.verbose then begin
    zclock_log('I: sending %s to broker',
      mdps_commands[Byte(command^)]);
    zmsg_print(msg);

  end;
  zmsg_send(msg, self.worker);
end;


procedure s_mdwrk_connect_to_broker(self: p_mdwrk_t);
//  Коннект или реконнект к брокеру
begin
  if self.worker <> nil then // Полный дисконнект, с разрушенеим сокета
    zsocket_destroy(self.ctx, self.worker);
  self.worker := zsocket_new(self.ctx, ZMQ_DEALER);
  zmq_connect(self.worker, self.broker);
  if self.verbose then
    zclock_log('I: connecting to broker at %s...', self.broker);

  //  Регистрация сервиса в брокере
  s_mdwrk_send_to_broker(self, cMDPW_READY, self.service, nil);

  //  Если число попыток (liveness) обнулилось, считаем, что брокер отключен
  self.liveness := cHEARTBEAT_LIVENESS;
  self.heartbeat_at := zclock_time() + self.heartbeat;

end;



function mdwrk_new(broker: PChar; service: PChar; verbose: Boolean):
  p_mdwrk_t;
var
  self: p_mdwrk_t;
begin
  assert(broker <> nil);
  assert(service <> nil);

  self := safe_malloc(sizeof(mdwrk_t));
  self.ctx := zctx_new();
  self.broker := zStrdup(broker);
  self.service := zStrdup(service);
  self.verbose := verbose;
  self.heartbeat := 2500; //  msecs
  self.reconnect := 2500; //  msecs

  s_mdwrk_connect_to_broker(self);
  result := self;
end;

function mdwrk_recv(self: p_mdwrk_t; var reply: p_zmsg_t): p_zmsg_t;
//  Метод сперва отсылает брокеру ответ, а затем ждет нового запроса.
//  Название метода странное, угу.

//  Отправляет ответ брокеру, если он есть и ждет следующего запроса

var
  command: p_zframe_t;
  empty: p_zframe_t;
  header: p_zframe_t;
  item: zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
begin
  //  Формирование и отсылка ответа (reply), если он пустой

  //  assert(reply <> nil);
  //    zmsg_t *reply = *reply_p;
  assert((reply <> nil) or (not self.expect_reply));
  if (reply <> nil) then begin
    assert(self.reply_to <> nil);
    zmsg_wrap(reply, self.reply_to);
    s_mdwrk_send_to_broker(self, cMDPW_REPLY, nil, reply);
    zmsg_destroy(reply);
  end;

  self.expect_reply := true;

  while (true) do begin


    zPollItemInit(item, self.worker, 0, ZMQ_POLLIN, 0);
    rc := zmq_poll(@item, 1, self.heartbeat * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; //  Прерван

    if (item.revents and ZMQ_POLLIN) <> 0 then begin
      msg := zmsg_recv(self.worker);
      if msg = nil then
        break; //  Прерван
      if self.verbose then begin
        zclock_log('I: received message from broker:');
        zmsg_print(msg);
      end;




      self.liveness := cHEARTBEAT_LIVENESS;

      //  Ошибки не обрабатываем, просто шумим assert-от
      assert(zmsg_size(msg) >= 3);

      empty := zmsg_pop(msg);
      assert(zframe_streq(empty, ''));
      zframe_destroy(empty);

      header := zmsg_pop(msg);
      assert(zframe_streq(header, cMDPW_WORKER));
      zframe_destroy(header);

      command := zmsg_pop(msg);
      if (zframe_streq(command, cMDPW_REQUEST)) then begin
        //  Вообще-то, мы должны извлечь и сохранить все адреса вплоть до
        // пустого, но сейчас сохраняем просто один...
        self.reply_to := zmsg_unwrap(msg);
        zframe_destroy(command);

        //  В этом месте мы должны обрабатывать сообщение
        //  Сейчас просто возвращаем его в приложение

        Result := msg; //  Вот и запрос на обработку
        Exit;

      end
      else
        if (zframe_streq(command, cMDPW_HEARTBEAT)) then
          // ;               //  В случае хартбита ничего не делаем
        else
          if (zframe_streq(command, cMDPW_DISCONNECT)) then
            s_mdwrk_connect_to_broker(self)
          else begin
            zclock_log('E: invalid input message');
            zmsg_print(msg);
          end;
      zframe_destroy(&command);
      zmsg_destroy(&msg);

    end
    else
      if z_Dec(self.liveness) = 0 then begin
        if self.verbose then
          zclock_log('W: disconnected from broker - retrying...');
        zclock_sleep(self.reconnect);
        s_mdwrk_connect_to_broker(self);
      end;
    //  Отправить хартбит, если пора
    if zclock_time() > self.heartbeat_at then begin
      s_mdwrk_send_to_broker(self, cMDPW_HEARTBEAT, nil, nil);
      self.heartbeat_at := zclock_time() + self.heartbeat;
    end;

  end;
  if zctx_interrupted() <> 0 then
    z_log('W: interrupt received, killing worker...'#10);
  Result := nil;
end;

end.


Замечания по поводу кода API рабочего:

  • API - однонитевое. Это означает, что, например, рабочий не может послать хартбит в фоне. К счастью, это как раз то, что нам нужно: если рабочий зависает, харбитинг останавливается и брокер перестает перестает отправлять запросы рабочему. 
  • API рабочего не выполняет экпоненциальный back-off, это усложнит реализацию.
  • API не формирует отчеты об ошибках. Если чего-то не хватает, формируется исключение. Это идеально для эталонной реализации, любые ошибки протокола  становятся сразу видны. Для реальных же приложений, API должен быть устойчивым к появлению неправильных сообщений.
Может возникнуть вопрос: для чего API рабочего при автоматическом реконнекте к брокеру вручную закрывает сокет связи с брокером и открывает новый. Чтобы понять, для чего это сделано, вспомним шаблон "Простой пират" и "Пират-параноик". Из-за того, что ZeroMQ будет выполнять автоматический реконнект рабочих, когда брокер умирает и восстанавливается, это не позволит перерегистрировать рабочих на брокере. Есть как минимум два решения. Простейшее, которое реализовано здесь, заключается в том, что рабочий мониторит соединение с помощью хартбитинга, и если рабочий решит, что брокер мертв, то закрывает его сокет и начинает работу заново с новым сокетом. Альтернативным решением является изучение брокером неизвестных рабочих, когда он получает хартбиты от рабочих и отправка рабочим запроса на перерегистрацию. Это должно поддерживаться протоколом.

Теперь разработаем брокер нашего шаблона "Мажордом". В основе его структуры будет набор очередей, по одной на сервис. Мы будем создавать эти очереди при появлении рабочих (при пропадании рабочих, мы могли бы удалять их, но пока не станем усложнять себе задачу). Дополнительно, мы мы будем хранить очередь рабочих для каждого сервиса.

Код брокера:

program mdbrocker;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  mdbrkapi in 'mdbrkapi.pas',
  zmq_h,
  czmq_h,
  ZMQ_Utils,
  mdp in 'mdp.pas';

procedure doIt;
//  Задача брокера. СОздаем экземпляр брокера и
//  обрабатываем сообщения, поступающие в сокет брокера
var
  empty: p_zframe_t;
  fVerbose: Boolean;
  header: p_zframe_t;
  item: zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  self: p_broker_t;
  sender: p_zframe_t;
  worker: p_worker_t;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  self := s_broker_new(fVerbose);
  s_broker_bind(self, 'tcp://*:5555');

  //  Получение и обработка сообщений, пока не случится прерывание
  while true do begin
    zPollItemInit(item, self.socket, 0, ZMQ_POLLIN, 0);
    rc := zmq_poll(@item, 1, cHEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; // Прерван
    //  Если есть входное сообщение, обрабатываем
    if (item.revents and ZMQ_POLLIN) <> 0 then begin
      msg := zmsg_recv(self.socket);
      if msg = nil then
        break; //  Прерван
      if self.verbose then begin
        zclock_log('I: received message:');
        zmsg_print(msg);
      end;
      sender := zmsg_pop(msg);
      empty := zmsg_pop(msg);
      header := zmsg_pop(msg);

      if (zframe_streq(header, cMDPC_CLIENT)) then
        s_broker_client_msg(self, sender, msg)
      else
        if (zframe_streq(header, cMDPW_WORKER)) then
          s_broker_worker_msg(self, sender, msg)
        else begin
          zclock_log('E: invalid message: ');
          zmsg_print(msg);
          zmsg_destroy(msg);
        end;

      zframe_destroy(sender);
      zframe_destroy(empty);
      zframe_destroy(header);
    end;

    //  Дисконнект и удаление всех умерших рабочих
    //  Если нужно - отправка хартбита ожидающим рабочим.
    if (zclock_time() > self.heartbeat_at) then begin
      s_broker_purge(self);
      worker := zlist_first(self.waiting);
      while worker <> nil do begin
        s_worker_send(worker, cMDPW_HEARTBEAT, nil, nil);
        worker := zlist_next(self.waiting);
      end;
      self.heartbeat_at := zclock_time() + cHEARTBEAT_INTERVAL;
    end;

  end;
  if (zctx_interrupted <> 0) then
    zclock_log('W: interrupt received, shutting down...');
  s_broker_destroy(self);
end;

begin
  doIt;
  Readln;
end.

Да, код брокера чуть посложней. Чуть.

В цикле  из сокета читаются сообщения. Отбрасываются два служебных фрейма, извлекается заголовок (третий фрейм):

      sender := zmsg_pop(msg);
      empty := zmsg_pop(msg);
      header := zmsg_pop(msg);

В зависимости от заголовка, сообщение обрабатывается как от клиента либо как от рабочего.
      if (zframe_streq(header, cMDPC_CLIENT)) then
        s_broker_client_msg(self, sender, msg)
      else
        if (zframe_streq(header, cMDPW_WORKER)) then
          s_broker_worker_msg(self, sender, msg)

Затем, если пришло время, всем живым рабочим отправляется хартбит:
    if (zclock_time() > self.heartbeat_at) then begin
      s_broker_purge(self);
      worker := zlist_first(self.waiting);
      while worker <> nil do begin
        s_worker_send(worker, cMDPW_HEARTBEAT, nil, nil);
        worker := zlist_next(self.waiting);
      end;
      self.heartbeat_at := zclock_time() + cHEARTBEAT_INTERVAL;
    end;

Вот и все.
Код реализации API брокера: :

 
unit mdbrkapi;

interface
uses
  mdp, czmq_h, zmq_h, ZMQ_Utils;


const

  //  Обычно эти данные берутся из конфигурационных файлов и т.п.
  cHEARTBEAT_LIVENESS = 3; // Обычно 3-5
  cHEARTBEAT_INTERVAL = 2500; //  миллисекунд
  cHEARTBEAT_EXPIRY = cHEARTBEAT_INTERVAL * cHEARTBEAT_LIVENESS;


type
  //  Класс брокера
  p_broker_t = ^broker_t;
  broker_t = record
    ctx: p_zctx_t; //  Контекст
    socket: Pointer; //  Сокет для клиентов и рабочих
    verbose: boolean; //  Протоколирование в stdout
    endpoint: PChar; //  Брокер привязывается к данной конечной точке
    services: p_zhash_t; //  Хэш известных сервисов
    workers: p_zhash_t; //  Хэш известных рабочих
    waiting: p_zlist_t; //  Список ждущих рабочих
    heartbeat_at: uint64_t; //  Когда отправлять HEARTBEAT
  end;

  //  Класс сервиса

  p_service_t = ^service_t;
  service_t = record
    broker: p_broker_t; // Экземпляр брокера
    name: PChar; //  Имя сервиса
    requests: p_zlist_t; //  Список запросов клиента
    waiting: p_zlist_t; //  Список ожидающих рабочих
    workers: size_t; //  Сколько у нас есть рабочих
  end;


  //  Класс рабочего

  p_worker_t = ^worker_t;
  worker_t = record
    broker: p_broker_t; // Экземпляр брокера
    id_string: PChar; // Идентификация брокера в виде строки
    identity: p_zframe_t; // Идентификация фрейма для маршрутизации
    service: p_service_t; // Принадлежность сервиса, если известна
    expiry: int64_t; // Когда сдохнет рабочий, если нет хартбита
  end;


function s_broker_new(verbose: Boolean): p_broker_t;
procedure s_broker_destroy(var self: p_broker_t);
procedure s_broker_bind(self: p_broker_t; endpoint: PChar);
procedure s_broker_worker_msg(self: p_broker_t; sender: p_zframe_t; msg:
  p_zmsg_t);
procedure s_broker_client_msg(self: p_broker_t; sender: p_zframe_t; msg:
  p_zmsg_t);
procedure s_broker_purge(self: p_broker_t);
//-----
function s_service_require(self: p_broker_t; service_frame: p_zframe_t):
  p_service_t;
procedure s_service_destroy(argument: Pointer);
procedure s_service_dispatch(self: p_service_t; msg: p_zmsg_t);

//----
function s_worker_require(self: p_broker_t; identity: p_zframe_t): p_worker_t;
procedure s_worker_delete(self: p_worker_t; disconnect: Boolean);
procedure s_worker_destroy(argument: Pointer);
procedure s_worker_send(self: p_worker_t; command: PChar; option: PChar; msg:
  p_zmsg_t);
procedure s_worker_waiting(self: p_worker_t);






implementation

uses
  SysUtils;

procedure s_broker_bind(self: p_broker_t; endpoint: PChar);
//  Привязывает брокер к конченой точке.
//  Можно вызывать многократно.
// ПРИМЕЧАНИЕ:  MDP использует один сокет для клиентов и для рабочих
begin
  zsocket_bind(self.socket, endpoint);
  zclock_log('I: MDP broker/0.2.0 is active at %s', endpoint);
end;

procedure s_broker_client_msg(self: p_broker_t; sender: p_zframe_t; msg:
  p_zmsg_t);
//  Обработка запроса от клиента. Запросы MMI реализованы прямо здесь
//  (в данный момент, мы реализовали только запрос):

var
  client: p_zframe_t;
  name: PChar;
  return_code: PChar;
  service: p_service_t;
  service_frame: p_zframe_t;
begin
  assert(zmsg_size(msg) >= 2); //  Имя сервиса + тело

  service_frame := zmsg_pop(msg);
  service := s_service_require(self, service_frame);

  //  Установить идентификатор клиента для ответа
  zmsg_wrap(msg, zframe_dup(sender));

  //  При получении запроса к сервису MMI обработать его внутри
  if (zframe_size(service_frame) >= 4)
    and CompareMem(zframe_data(service_frame), PChar('mmi.'), 4) then begin
    if zframe_streq(service_frame, 'mmi.service') then begin
      name := zframe_d_strdup(zmsg_last(msg));
      service := zhash_lookup(self.services, name);

      if service <> nil then
        if service.workers <> 0 then
          return_code := '200'
        else
          return_code := '400'
      else
        return_code := nil;

      zstr_free(name);
    end
    else
      return_code := '501';

    zframe_reset(zmsg_last(msg), return_code, strlen(return_code));

    //  Удаление и сохранение конверта ответа для клиента и вставка
    //  заголовка протокола и имени сервиса, затем перепаковка конверта
    client := zmsg_unwrap(msg);
    zmsg_prepend(msg, service_frame);
    zmsg_pushstr(msg, cMDPC_CLIENT);
    zmsg_wrap(msg, client);
    zmsg_send(msg, self.socket);
  end
  else
    //  Иначе перенаправить сообщение запрошенному сервису
    s_service_dispatch(service, msg);
  zframe_destroy(service_frame);
end;

procedure s_broker_destroy(var self: p_broker_t);
//  Деструктор брокера
begin
  assert(self <> nil);
  if (self <> nil) then begin
    zctx_destroy(self.ctx);
    zhash_destroy(self.services);
    zhash_destroy(self.workers);
    zlist_destroy(self.waiting);
    safe_memfree(self);
    self := nil;
  end;
end;

function s_broker_new(verbose: Boolean): p_broker_t;
//  Конструктор брокера
begin
  Result := safe_malloc(sizeof(broker_t));
  //  Инициализация состояния брокера
  Result.ctx := zctx_new();
  Result.socket := zsocket_new(Result.ctx, ZMQ_ROUTER);
  Result.verbose := verbose;
  Result.services := zhash_new();
  Result.workers := zhash_new();
  Result.waiting := zlist_new();
  Result.heartbeat_at := zclock_time() + cHEARTBEAT_INTERVAL;
end;

procedure s_broker_purge(self: p_broker_t);
//  Метод удаляет ожидающих рабочих, которые нас не пингуют.
//  Рабочие хранятся от самых старых к наиболее часто используемым, поэтому мы
//  можем остановить сканирование, как только будет найден живой рабочий.
//  Таким образом, мы чаще всего останавлиаемся на первом рабочем, который
//  наиболее важен в случае, когда рабочих много.
//  Этот метод вызывается из service_dispatch.
var
  worker: p_worker_t;
begin
  worker := zlist_first(self.waiting);
  while worker <> nil do begin
    if zclock_time() < worker.expiry then
      break; //  Рабочий еще жив, прекращаем сканирование
    if self.verbose then
      zclock_log('I: deleting expired worker: %s', worker.id_string);
    s_worker_delete(worker, False);
    worker := zlist_first(self.waiting);
  end;
end;

procedure s_broker_worker_msg(self: p_broker_t; sender: p_zframe_t; msg:
  p_zmsg_t);
//  Метод обрабатывает сообщения READY, REPLY, HEARTBEAT, DISCONNECT,
// полученные брокером
var
  client: p_zframe_t;
  command: p_zframe_t;
  id_string: PChar;
  service_frame: p_zframe_t;
  worker: p_worker_t;
  worker_ready: Boolean;
begin
  assert(zmsg_size(msg) >= 1); //  Как минимум, это команда

  command := zmsg_pop(msg);
  id_string := zframe_strhex(sender);
  worker_ready := (zhash_lookup(self.workers, id_string) <> nil);
  zstr_free(id_string);
  worker := s_worker_require(self, sender);

  if zframe_streq(command, cMDPW_READY) then begin
    if worker_ready then //  Не первая команда в сессии
      s_worker_delete(worker, True)
    else
      if (zframe_size(sender) >= 4) //  Зарезервированное имя сервиса
      and CompareMem(zframe_data(sender), PChar('mmi.'), 4) then
        s_worker_delete(worker, True)
      else begin
        //  Присоеденить рабочего к сервису и пометить как отдыхающего
        service_frame := zmsg_pop(msg);
        worker.service := s_service_require(self, service_frame);
        s_Inc(worker.service.workers);
        s_worker_waiting(worker);
        zframe_destroy(service_frame);
      end
  end else if zframe_streq(command, cMDPW_REPLY) then begin
    if (worker_ready) then begin
      //  Удаление и сохранение обратного конверта клиента, вставка заголовка
      // по протоколу и имени сервиса, перепаковка конверта.
      client := zmsg_unwrap(msg);
      zmsg_pushstr(msg, worker.service.name);
      zmsg_pushstr(msg, cMDPC_CLIENT);
      zmsg_wrap(msg, client);
      zmsg_send(&msg, self.socket);
      s_worker_waiting(worker);
    end
    else
      s_worker_delete(worker, True)
  end else if zframe_streq(command, cMDPW_HEARTBEAT) then
    if worker_ready then
      worker.expiry := zclock_time() + cHEARTBEAT_EXPIRY
    else
      s_worker_delete(worker, True)
  else if zframe_streq(command, cMDPW_DISCONNECT) then
    s_worker_delete(worker, False)
  else begin
    zclock_log('E: invalid input message');
    zmsg_print(msg);
  end;
  //  safe_memfree(command);
//  zframe_destroy(command);
  zstr_free(PChar(command));
  zmsg_destroy(msg);
end;

procedure s_service_destroy(argument: Pointer);
//  Деструктор сервиса, вызывается автоматически всякий раз, когда сервис
//  удаляется из broker.services.
var
  msg: p_zmsg_t;
  service: p_service_t;
begin
  service := argument;
  while zlist_size(service.requests) > 0 do begin
    msg := zlist_pop(service.requests);
    zmsg_destroy(msg);
  end;
  zlist_destroy(service.requests);
  zlist_destroy(service.waiting);
  zstr_free(service.name);
  safe_memfree(service);
end;

procedure s_service_dispatch(self: p_service_t; msg: p_zmsg_t);
//  This method sends requests to waiting workers:
var
  f_msg: p_zmsg_t;
  worker: p_worker_t;
begin
  assert(self <> nil);
  if msg <> nil then //  Queue message if any
    zlist_append(self.requests, msg);

  s_broker_purge(self.broker);
  while (zlist_size(self.waiting) > 0) and (zlist_size(self.requests) > 0) do begin
    worker := zlist_pop(self.waiting);
    zlist_remove(self.broker.waiting, worker);
    f_msg := zlist_pop(self.requests);
    s_worker_send(worker, cMDPW_REQUEST, nil, f_msg);
    zmsg_destroy(f_msg);
  end;
end;

function s_service_require(self: p_broker_t; service_frame: p_zframe_t):
  p_service_t;
//  Конструктор - ленивка, который ищет сервис по имени или создает новый сервис,
//  если сервиса с таким именем нет.
var
  name: PChar;
  service: p_service_t;
begin
  assert(service_frame <> nil);
  name := zframe_d_strdup(service_frame);

  service := zhash_lookup(self.services, name);
  if service = nil then begin
    service := safe_malloc(sizeof(service_t));
    service.broker := self;
    service.name := name;
    service.requests := zlist_new();
    service.waiting := zlist_new();
    zhash_insert(self.services, name, service);
    zhash_freefn(self.services, name, @s_service_destroy);
    if self.verbose then
      zclock_log('I: added service: %s', name);
  end
  else
    zstr_free(name);

  Result := service;
end;

procedure s_worker_delete(self: p_worker_t; disconnect: Boolean);
//  Удаляет рабочего
begin
  assert(self <> nil);
  if disconnect then
    s_worker_send(self, cMDPW_DISCONNECT, nil, nil);

  if self.service <> nil then begin
    zlist_remove(self.service.waiting, self);
    Dec(self.service.workers);
  end;
  zlist_remove(self.broker.waiting, self);
  //  Сие автоматически вызывает и s_worker_destroy
  zhash_delete(self.broker.workers, self.id_string);
end;

procedure s_worker_destroy(argument: Pointer);
//  Деструктор рабочего, вызывается автоматически при удалении рабочего из
//  broker.workers.
var
  self: p_worker_t;
begin
  self := argument;
  zframe_destroy(self.identity);
  zstr_free(self.id_string);
  safe_memfree(self);
end;

function s_worker_require(self: p_broker_t; identity: p_zframe_t): p_worker_t;
//  Конструктор - ленивка, который находит рабочего по идентификации, либо
// создает рабочего, если рабочего с такой идентификацией не найдено.
var
  id_string: PChar;
  worker: p_worker_t;
begin
  assert(identity <> nil);

  //  workers - предоставляет доступ к рабочим по их идентификациям
  id_string := zframe_strhex(identity);
  worker := zhash_lookup(self.workers, id_string);

  if worker = nil then begin
    worker := safe_malloc(sizeof(worker_t));
    worker.broker := self;
    worker.id_string := id_string;
    worker.identity := zframe_dup(identity);
    zhash_insert(self.workers, id_string, worker);
    zhash_freefn(self.workers, id_string, @s_worker_destroy);
    if self.verbose then
      zclock_log('I: registering new worker: %s', id_string);
  end
  else
    zstr_free(id_string);
  Result := worker;
end;

procedure s_worker_send(self: p_worker_t; command: PChar; option: PChar;
  msg: p_zmsg_t);
//  Формирует и отправляет команду рабочему. При вызове метода также можно
//  предоставить дополнителный параметр (option) к команде (command) и
//  дополнительно догрузить сообщение:
begin
  if msg = nil then
    msg := zmsg_new()
  else
    msg := zmsg_dup(msg);

  //  Упаковка конверта сообщения в соответствии со стеком протокола
  if option <> nil then
    zmsg_pushstr(msg, option);
  zmsg_pushstr(msg, command);
  zmsg_pushstr(msg, cMDPW_WORKER);

  //  Для маршрутизации сообщения:
  zmsg_wrap(msg, zframe_dup(self.identity));

  if self.broker.verbose then begin
    zclock_log('I: sending %s to worker', mdps_commands[Byte(command^)]);
    zmsg_print(msg);
  end;
  zmsg_send(msg, self.broker.socket);
end;

procedure s_worker_waiting(self: p_worker_t);
//  Этот рабочий ждет работу
begin
  //  Списки ждущих очередей от брокера и сервиса
  assert(self.broker <> nil);
  zlist_append(self.broker.waiting, self);
  zlist_append(self.service.waiting, self);
  self.expiry := zclock_time() + cHEARTBEAT_EXPIRY;
  s_service_dispatch(self.service, nil);
end;


end.


Несколько замечаний по коду брокера:

  • Протокол "Мажордом" позволяет обслужить клиентов и рабочих с помощью одного сокета. Это упрощает развертывание и обслуживание брокера. Обычно другие прокси требуют два сокета.
  • Брокер реализует все требования протокола MDP/0.1, включая отключение, когда брокер посылает неправильные команды, хартбиты и прочее.
  • Брокер может быть легко расширен до многонитевого, каждая нить могла бы обслуживать один сокет и один набор клиентов и рабочих. Такой подход был бы интересен для сегментирования объемных сетевых структур. Псевдоклассовая организация кода позволяет довольно легко сделать это. 
  • Модель надежности типа "основной/резервный" или "живой/живой" для брокера легко реализуется, так как брокер фактически не хранит состояние корреспондентов за исключением наличия сервиса. Это позволяет клиентам и рабочим переключиться на другого брокера, когда первый недоступен. 
  • В примере используется пятисекундный хартбитинг, в основном для того, чтобы уменьшить время вывода при отладке. Реальные значения могут быть ниже для большинства сетевых приложений. Тем не менее, любой перезапрос должен быть достаточно медленным, чтобы позволить сервису перезапуститься. Например, 10 секунд (или дольше).
По поводу "псевдообъектного" стиля реализации. Совсем несложно использовать возможности языка и использовать "настоящие" классы и объекты: (Примеры "объектного" кода клиента и рабочего)

3 комментария :

  1. Здравствуйте, s_Dec (retries_left) компилятор не проходит, определения функций удаления s_dec

    ОтветитьУдалить
  2. [dcc32 Error] mdcliapi.pas(115): E2003 Undeclared identifier: 's_Dec'
    [dcc32 Error] mdcliapi.pas(45): E2003 Undeclared identifier: 's_strdup'

    ОтветитьУдалить
  3. Да, был обновлвен zmq_utils, а я забыл выложить обновление, можно скачать:
    https://drive.google.com/file/d/0B2vGKHGkken9Nlo1ZVRwOG5mSXc/view

    Функция z_Dec() выполняет декремент и возвращает результат.
    Функция zstrdup() копирует строки, используя менеджер памяти .dll библиотеки ZeroMQ.

    ОтветитьУдалить