суббота, 10 января 2015 г.

22.6.1 ZeroMQ: надежные схемы "Запрос/Ответ". Сервис - ориентированная надежная очередь. Шаблон "Мажордом". "Объектный" стиль кода.

(Начало).

Ничего нового в нижеследующем коде нет, за исключением того, что он написан с использованием объектов и классов Delphi.

Код клиента в "объектном" стиле::

  
program mdclient;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  mdcliapi in 'mdcliapi.pas',
  mdp in 'mdp.pas';

procedure DoIt;
var
  i: Integer;
  fSession: TmdClient;
  fVerbose: Boolean;
  reply: p_zmsg_t;
  request: p_zmsg_t;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  Writeln(ParamStr(1));
  fSession := TmdClient.Create('tcp://localhost:5555', fVerbose);
  try

    i := 0;
    for i := 0 to 99999 do begin
      request := zmsg_new();
      zmsg_pushstr(request, 'Hello world');
      reply := fSession.Send('echo', request);
      if reply <> nil then
        zmsg_destroy(reply)
      else
        break; //  Прерывание или отказ
    end;

    zclock_log('%d requests / replies processed'#10, i);
  finally
    fSession.Free;
  end;

end;
begin
  DoIt;
  Readln;
end.


Код API клиента в "объектном" стиле::

  

unit mdcliapi;

interface
uses
  mdp
  , zmq_h
  , czmq_h
  , ZMQ_Utils;


type

  TmdClient = class(TObject)
  private
    Ctx: p_zctx_t; //  ZMQ контекст
    Broker: string;
    sctClient: Pointer; //  Сокет для связи с брокером
    Verbose: Boolean; //  Протоколирование в stdout
    Timeout: integer; //  Таймаут запроса
    Retries: integer; //  Число попыток

    procedure ConnectToBrocker;

  public
    constructor Create(aBroker: string; aVerbose: Boolean);
    destructor Destroy; override;
    function Send(service: PChar; var request: p_zmsg_t): p_zmsg_t;

  end;


implementation

uses
  SysUtils;


procedure TmdClient.ConnectToBrocker;
begin
  if sctClient <> nil then
    zsocket_destroy(ctx, sctClient);
  sctClient := zsocket_new(ctx, ZMQ_REQ);
  zmq_connect(sctClient, PChar(broker));
  if verbose then
    zclock_log('I: connecting to broker at %s...', Broker);
end;

constructor TmdClient.Create(aBroker: string; aVerbose: Boolean);
begin
  Ctx := zctx_new();
  Broker := aBroker;
  Verbose := aVerbose;
  Timeout := 2500; //  msecs
  Retries := 3; //  Before we abandon
  ConnectToBrocker();
end;

destructor TmdClient.Destroy;
begin
  inherited;
  zctx_destroy(Ctx);
  Broker := '';
end;



function TmdClient.Send(service: PChar; var request: p_zmsg_t): p_zmsg_t;
var
  header: p_zframe_t;
  item: zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  reply_service: p_zframe_t;
  retries_left: Integer;
begin
  assert(request <> nil);

  //  В соответствии с требованиями протокола, должны быть фреймы-префиксы
  //  Фрейм 1: "MDPCxy" (шесть байт, MDP/Client x.y)
  //  Фрейм 2: Имя сервиса (печатная строка)
  zmsg_pushstr(request, service);
  zmsg_pushstr(request, cMDPC_CLIENT);
  if verbose then begin
    zclock_log('I: send request to ''%s'' service:', service);
    zmsg_print(request);
  end;
  retries_left := Retries;
  while (retries_left > 0) and (zctx_interrupted = 0) do begin
    msg := zmsg_dup(request); // Работаем с копией (вдруг оригинал пригодится?)
    zmsg_send(msg, sctClient);

    zPollItemInit(item, sctClient, 0, ZMQ_POLLIN, 0);

    //  При любом блокирующем вызове (libzmq), в случае ошибки  будет -1;
    //  Теоретически, следует учесть разные коды завершения,
    //  но на практике достаточно учесть {EINTR} (Ctrl-C):

    rc := zmq_poll(@item, 1, Timeout * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; //  Прерван

    //  Что-то приняли, обрабатываем
    if (item.revents and ZMQ_POLLIN) <> 0 then begin
      msg := zmsg_recv(sctClient);
      if Verbose then begin
        zclock_log('I: received reply:');
        zmsg_print(msg);
      end;
      //  В реальном коде обрабботка отказа должна быть более тщательной
      assert(zmsg_size(msg) >= 3);

      header := zmsg_pop(msg);
      assert(zframe_streq(header, cMDPC_CLIENT));
      zframe_destroy(header);

      reply_service := zmsg_pop(msg);
      assert(zframe_streq(reply_service, service));
      zframe_destroy(&reply_service);

      zmsg_destroy(request);
      Result := msg; //  Успех
      Exit;
    end
    else if s_Dec(retries_left) > 0 then begin
      if (Verbose) then
        zclock_log('W: no reply, reconnecting...');
      ConnectToBrocker;
    end
    else begin
      if (Verbose) then
        zclock_log('W: permanent error, abandoning');
      break; //  Всё

    end;
  end;
  if zctx_interrupted <> 0 then
    zclock_log('W: interrupt received, killing client...'#10);
  zmsg_destroy(request);
  Result := nil;
end;


end.

Код Рабочего в "объектном" стиле::

  
program mdworker;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  mdwrkapi in 'mdwrkapi.pas',
  mdp in 'mdp.pas';

procedure DoIt;
var
  fVerbose: Boolean;
  fReply: p_zmsg_t;
  fRequest: p_zmsg_t;
  fSession: TmdWorker;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  fSession := TmdWorker.Create('tcp://localhost:5555', 'echo', fVerbose);
  fReply := nil;
  while (true) do begin
    fRequest := fSession.Recv(fReply);
    if (fRequest = nil) then
      break; //  Рабочий был прерван
    fReply := fRequest; //  Эхо ... :-)
  end;
  fSession.Free;
  Exit;

end;
begin
  doIt;
  Readln;
end.


Код API рабочего в "объектном" стиле::

  

unit mdwrkapi;

interface
uses
  zmq_h
  , czmq_h
  ;
type
  TmdWorker = class(TObject)
    Ctx: p_zctx_t; //  Наш контекст
    Broker: string;
    Service: string;
    sctWorker: Pointer; //  Сокет для связи с брокером Socket to broker
    Verbose: Boolean; //  Протоколировать действия в stdout

    //  Управление хартбитингом
    HeartbeatAt: uint64_t;
    Liveness: size_t; //  Сколько осталось попыток
    Heartbeat: integer; //  Задержка хартбитинга, в миллисекундах
    Reconnect: Integer; //  Задержка реконнекта в миллисекундах

    ExpectReply: Boolean;
    ReplyTo: p_zframe_t;
  private
    procedure ConnectToBroker;

    procedure SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t);
  public
    constructor Create(const aBroker, aService: string; aVerbose: Boolean);
    destructor Destroy; override;
    function Recv(var reply: p_zmsg_t): p_zmsg_t;


  end;
const
  //  Параметры надежности
  cHEARTBEAT_LIVENESS = 3; //  3-5 достаточно


implementation

uses
  ZMQ_Utils, mdp;


constructor TmdWorker.Create(const aBroker, aService: string; aVerbose: Boolean);
begin
  assert(aBroker <> '');
  assert(aService <> '');

  Ctx := zctx_new();
  Broker := aBroker;
  Service := aService;
  Verbose := aVerbose;
  Heartbeat := 2500; //  msecs
  Reconnect := 2500; //  msecs
  ConnectToBroker();

end;

destructor TmdWorker.Destroy;
begin
  inherited;
  zctx_destroy(Ctx);
  Broker := '';
  Service := '';
end;

function TmdWorker.Recv(var reply: p_zmsg_t): p_zmsg_t;
//  Метод сперва отсылает брокеру ответ, а затем ждет нового запроса.
//  Название метода странное, угу.

//  Отправляет ответ брокеру, если он есть и ждет следующего запроса


var
  fCommand: p_zframe_t;
  fEmpty: p_zframe_t;
  fHeader: p_zframe_t;
  fItem: zmq_pollitem_t;
  fMsg: p_zmsg_t;
  RC: Integer;
begin
  //  Формирование и отсылка ответа (reply), если он пустой
  assert((reply <> nil) or (not ExpectReply));
  if (reply <> nil) then begin
    assert(ReplyTo <> nil);
    zmsg_wrap(reply, ReplyTo);
    SendToBroker(cMDPW_REPLY, nil, reply);
    zmsg_destroy(reply);
  end;

  ExpectReply := true;

  while (true) do begin


    zPollItemInit(fItem, sctWorker, 0, ZMQ_POLLIN, 0);
    RC := zmq_poll(@fItem, 1, Heartbeat * ZMQ_POLL_MSEC);
    if RC = -1 then
      break; //  Прерван

    if (fItem.revents and ZMQ_POLLIN) <> 0 then begin
      fMsg := zmsg_recv(sctWorker);
      if fMsg = nil then
        break; //  Прерван
      if Verbose then begin
        zclock_log('I: received message from broker:');
        zmsg_print(fMsg);
      end;

      Liveness := cHEARTBEAT_LIVENESS;

      //  Ошибки не обрабатываем, просто шумим assert-от
      assert(zmsg_size(fMsg) >= 3);

      fEmpty := zmsg_pop(fMsg);
      assert(zframe_streq(fEmpty, ''));
      zframe_destroy(fEmpty);

      fHeader := zmsg_pop(fMsg);
      assert(zframe_streq(fHeader, cMDPW_WORKER));
      zframe_destroy(fHeader);

      fCommand := zmsg_pop(fMsg);
      if (zframe_streq(fCommand, cMDPW_REQUEST)) then begin
        //  Вообще-то, мы должны извлечь и сохранить все адреса вплоть до
        // пустого, но сейчас сохранем просто один...
        ReplyTo := zmsg_unwrap(fMsg);
        zframe_destroy(fCommand);

        //  В этом месте мы должны обрабатывать сообщение
        //  Сейчас просто возвращаем его в приложение

        Result := fMsg; //  Запрос на обработку
        Exit;

      end
      else
        if (zframe_streq(fCommand, cMDPW_HEARTBEAT)) then
          // ;               //  В случае хартбита ничего не делаем
        else
          if (zframe_streq(fCommand, cMDPW_DISCONNECT)) then
            ConnectToBroker()
          else begin
            zclock_log('E: invalid input message');
            zmsg_print(fMsg);
          end;
      zframe_destroy(&fCommand);
      zmsg_destroy(&fMsg);

    end
    else
      if s_Dec(Liveness) = 0 then begin
        if Verbose then
          zclock_log('W: disconnected from broker - retrying...');
        zclock_sleep(Reconnect);
        ConnectToBroker();
      end;
    //  Отправить хратбит, если пора
    if zclock_time() > HeartbeatAt then begin
      SendToBroker(cMDPW_HEARTBEAT, nil, nil);
      HeartbeatAt := zclock_time() + Heartbeat;
    end;

  end;
  if zctx_interrupted() <> 0 then
    z_log('W: interrupt received, killing worker...'#10);
  Result := nil;
end;

procedure TmdWorker.ConnectToBroker;
//  Коннект или реконнект к брокеру
begin
  if sctWorker <> nil then // Полный дисконнект, с разрушенеим сокета
    zsocket_destroy(Ctx, sctWorker);
  sctWorker := zsocket_new(Ctx, ZMQ_DEALER);
  zmq_connect(sctWorker, PChar(Broker));
  if Verbose then
    zclock_log('I: connecting to broker at %s...', PChar(Broker));

  //  Регистрация сервиса в брокере
  SendToBroker(cMDPW_READY, PChar(Service), nil);

  //  Если число попыток (liveness) обнулилось, считаем, что брокер отключен
  Liveness := cHEARTBEAT_LIVENESS;
  HeartbeatAt := zclock_time() + heartbeat;

end;


procedure TmdWorker.SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t);
//  Отправка сообщения брокеру
//  Если сообщения нет (пусто), создать его самостоятельно
begin
  if aMsg <> nil then
    aMsg := zmsg_dup(aMsg)
  else
    aMsg := zmsg_new();

  //  Формирование конверта сообщения в сответствии с протоколом
  if aOption <> nil then
    zmsg_pushstr(aMsg, aOption);
  zmsg_pushstr(aMsg, aCommand);
  zmsg_pushstr(aMsg, cMDPW_WORKER);
  zmsg_pushstr(aMsg, '');

  if Verbose then begin
    zclock_log('I: sending %s to broker',
      mdps_commands[Byte(aCommand^)]);
    zmsg_print(aMsg);

  end;
  zmsg_send(aMsg, sctWorker);
end;


end.


Если кто-то возжелает написать и брокер в объектном стиле, напоминаю, что вторым аргументом метода zhash_freefn() должна быть ссылка на функцию типа zhash_free_fn:
  
  zhash_free_fn = procedure(data: Pointer); cdecl;
Чтобы описать такой метод в рамках класса, следует использовать модификаторы class и static
  
    class procedure service_destroy(service: Pointer); static;
Честно говоря, я бы не стал заморачиваться с zhash_lookup(), а использовал бы вместо них, например, дельфийские TStringList с включенной сортировкой.

Комментариев нет :

Отправить комментарий