понедельник, 7 декабря 2015 г.

23.3 ZeroMQ: Пересылка файлов. Модель №3 - клиент использует конвейерное управление потоком на основе кредитования сервера запросами.

Начало - здесь.

Сервер может отсылать по 10 кусков за раз, затем ждать однократного подтверждения.
Это, в общем, бессмысленно: все равно что умножать размер куска на 10.
Сервер может отправлять клиенту порции файла без перезапросов от клиента, периодически делая небольшие паузы, чтобы загрузить сеть настолько, насколько она может справиться. Для этого сервер должен знать, что происходит с сетью. Получение такой информации представляется непростой задачей. Кроме того, непонятно, что делать, когда сеть быстрая, а клиенты медленные. Кто будет заниматься буферизацией сообщений?
Сервер мог бы отслеживать состояние исходящей очереди и отправлять сообщения тогда, когда в очереди есть свободное место. Но ZeroMQ не позволяет таких вольностей. И сервер, и сеть могут оказаться достаточно быстрыми, а клиент - оказаться маленьким медленным устройством.
Можно, в конце концов, модифицировать libzmq так, чтобы изменить поведение при достижении границы HWM. Возможно, нужно блокировать новые сообщения. Тоже не очень здорово: один маленький медленный клиент заблокирует весь сервер.
Можно попробовать возвращать клиенту сообщение с ошибкой. Тогда усложнится сервер. Ох. Пока самое лучшее, что может сделать сервер - это отбросить сообщение.
В общем, все эти варианты либо усложняют логику, либо вообще легко приводят систему в нерабочее состояние
Все, что нам нужно - это дать возможность клиенту возможность сообщать серверу о своей готовности к работе. Если мы все сделаем правильно, данные будут поступать к клиенту непрерывным потоком, но лишь тогда, когда клиент будет готов их принять.



23.2 ZeroMQ: Пересылка файлов. Модель №2 - клиент запрашивает каждую порцию файла по отдельности.


Продолжение (начало - здесь).

Вторая версия протокола пересылки файла предусматривает, что клиент каждый раз запрашивает по одной порции файла, а сервер, соответственно, возвращает по одной порции на каждый запрос, полученный от клиента:



пятница, 4 декабря 2015 г.

23.1 ZeroMQ: Пересылка файлов. Модель №1 - отправляем сразу весь файл большими порциями.


Задача: переслать файл.


ZMQ "искаропки" отлично справляется с пересылкой сообщений и оповещениями, но вот с пересылкой файлов все не так очевидно. Можно, конечно, тупо попробовать засунуть файл в сообщение целиком.
Однако, есть возражения:
1. Оперативная память, к сожалению, не резиновая.
2. Ладно, отправили по сети файл-сообщение. А сеть оказалась медленной (какой-нибудь древний вай-фай), да еще и неустойчивой. ZMQ, как мы помним, пытается обеспечить доставку сообщения даже в случае сбоев. Итак, несколько попыток пересылки по 1 гигабайту - здорово?
3. А если этот гигантский файл хотят скачать несколько клиентов одновременно? Каждому клиенту выделить буфер нужного размера, отправить и ждать? Свободная память закончится гораздо быстрее. То есть, "правильный" протокол передачи файлов должен учитывать ограниченность размеров ОЗУ.
...
...начинаем строить правильный протокол передачи файлов.

суббота, 10 января 2015 г.

22.6.1 ZeroMQ: надежные схемы "Запрос/Ответ". Сервис - ориентированная надежная очередь. Шаблон "Мажордом". "Объектный" стиль кода.

(Начало).

Ничего нового в нижеследующем коде нет, за исключением того, что он написан с использованием объектов и классов Delphi.

Код клиента в "объектном" стиле::

  
program mdclient;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  mdcliapi in 'mdcliapi.pas',
  mdp in 'mdp.pas';

procedure DoIt;
var
  i: Integer;
  fSession: TmdClient;
  fVerbose: Boolean;
  reply: p_zmsg_t;
  request: p_zmsg_t;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  Writeln(ParamStr(1));
  fSession := TmdClient.Create('tcp://localhost:5555', fVerbose);
  try

    i := 0;
    for i := 0 to 99999 do begin
      request := zmsg_new();
      zmsg_pushstr(request, 'Hello world');
      reply := fSession.Send('echo', request);
      if reply <> nil then
        zmsg_destroy(reply)
      else
        break; //  Прерывание или отказ
    end;

    zclock_log('%d requests / replies processed'#10, i);
  finally
    fSession.Free;
  end;

end;
begin
  DoIt;
  Readln;
end.


Код API клиента в "объектном" стиле::

  

unit mdcliapi;

interface
uses
  mdp
  , zmq_h
  , czmq_h
  , ZMQ_Utils;


type

  TmdClient = class(TObject)
  private
    Ctx: p_zctx_t; //  ZMQ контекст
    Broker: string;
    sctClient: Pointer; //  Сокет для связи с брокером
    Verbose: Boolean; //  Протоколирование в stdout
    Timeout: integer; //  Таймаут запроса
    Retries: integer; //  Число попыток

    procedure ConnectToBrocker;

  public
    constructor Create(aBroker: string; aVerbose: Boolean);
    destructor Destroy; override;
    function Send(service: PChar; var request: p_zmsg_t): p_zmsg_t;

  end;


implementation

uses
  SysUtils;


procedure TmdClient.ConnectToBrocker;
begin
  if sctClient <> nil then
    zsocket_destroy(ctx, sctClient);
  sctClient := zsocket_new(ctx, ZMQ_REQ);
  zmq_connect(sctClient, PChar(broker));
  if verbose then
    zclock_log('I: connecting to broker at %s...', Broker);
end;

constructor TmdClient.Create(aBroker: string; aVerbose: Boolean);
begin
  Ctx := zctx_new();
  Broker := aBroker;
  Verbose := aVerbose;
  Timeout := 2500; //  msecs
  Retries := 3; //  Before we abandon
  ConnectToBrocker();
end;

destructor TmdClient.Destroy;
begin
  inherited;
  zctx_destroy(Ctx);
  Broker := '';
end;



function TmdClient.Send(service: PChar; var request: p_zmsg_t): p_zmsg_t;
var
  header: p_zframe_t;
  item: zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  reply_service: p_zframe_t;
  retries_left: Integer;
begin
  assert(request <> nil);

  //  В соответствии с требованиями протокола, должны быть фреймы-префиксы
  //  Фрейм 1: "MDPCxy" (шесть байт, MDP/Client x.y)
  //  Фрейм 2: Имя сервиса (печатная строка)
  zmsg_pushstr(request, service);
  zmsg_pushstr(request, cMDPC_CLIENT);
  if verbose then begin
    zclock_log('I: send request to ''%s'' service:', service);
    zmsg_print(request);
  end;
  retries_left := Retries;
  while (retries_left > 0) and (zctx_interrupted = 0) do begin
    msg := zmsg_dup(request); // Работаем с копией (вдруг оригинал пригодится?)
    zmsg_send(msg, sctClient);

    zPollItemInit(item, sctClient, 0, ZMQ_POLLIN, 0);

    //  При любом блокирующем вызове (libzmq), в случае ошибки  будет -1;
    //  Теоретически, следует учесть разные коды завершения,
    //  но на практике достаточно учесть {EINTR} (Ctrl-C):

    rc := zmq_poll(@item, 1, Timeout * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; //  Прерван

    //  Что-то приняли, обрабатываем
    if (item.revents and ZMQ_POLLIN) <> 0 then begin
      msg := zmsg_recv(sctClient);
      if Verbose then begin
        zclock_log('I: received reply:');
        zmsg_print(msg);
      end;
      //  В реальном коде обрабботка отказа должна быть более тщательной
      assert(zmsg_size(msg) >= 3);

      header := zmsg_pop(msg);
      assert(zframe_streq(header, cMDPC_CLIENT));
      zframe_destroy(header);

      reply_service := zmsg_pop(msg);
      assert(zframe_streq(reply_service, service));
      zframe_destroy(&reply_service);

      zmsg_destroy(request);
      Result := msg; //  Успех
      Exit;
    end
    else if s_Dec(retries_left) > 0 then begin
      if (Verbose) then
        zclock_log('W: no reply, reconnecting...');
      ConnectToBrocker;
    end
    else begin
      if (Verbose) then
        zclock_log('W: permanent error, abandoning');
      break; //  Всё

    end;
  end;
  if zctx_interrupted <> 0 then
    zclock_log('W: interrupt received, killing client...'#10);
  zmsg_destroy(request);
  Result := nil;
end;


end.

Код Рабочего в "объектном" стиле::

  
program mdworker;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  mdwrkapi in 'mdwrkapi.pas',
  mdp in 'mdp.pas';

procedure DoIt;
var
  fVerbose: Boolean;
  fReply: p_zmsg_t;
  fRequest: p_zmsg_t;
  fSession: TmdWorker;
begin
  fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
  fSession := TmdWorker.Create('tcp://localhost:5555', 'echo', fVerbose);
  fReply := nil;
  while (true) do begin
    fRequest := fSession.Recv(fReply);
    if (fRequest = nil) then
      break; //  Рабочий был прерван
    fReply := fRequest; //  Эхо ... :-)
  end;
  fSession.Free;
  Exit;

end;
begin
  doIt;
  Readln;
end.


Код API рабочего в "объектном" стиле::

  

unit mdwrkapi;

interface
uses
  zmq_h
  , czmq_h
  ;
type
  TmdWorker = class(TObject)
    Ctx: p_zctx_t; //  Наш контекст
    Broker: string;
    Service: string;
    sctWorker: Pointer; //  Сокет для связи с брокером Socket to broker
    Verbose: Boolean; //  Протоколировать действия в stdout

    //  Управление хартбитингом
    HeartbeatAt: uint64_t;
    Liveness: size_t; //  Сколько осталось попыток
    Heartbeat: integer; //  Задержка хартбитинга, в миллисекундах
    Reconnect: Integer; //  Задержка реконнекта в миллисекундах

    ExpectReply: Boolean;
    ReplyTo: p_zframe_t;
  private
    procedure ConnectToBroker;

    procedure SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t);
  public
    constructor Create(const aBroker, aService: string; aVerbose: Boolean);
    destructor Destroy; override;
    function Recv(var reply: p_zmsg_t): p_zmsg_t;


  end;
const
  //  Параметры надежности
  cHEARTBEAT_LIVENESS = 3; //  3-5 достаточно


implementation

uses
  ZMQ_Utils, mdp;


constructor TmdWorker.Create(const aBroker, aService: string; aVerbose: Boolean);
begin
  assert(aBroker <> '');
  assert(aService <> '');

  Ctx := zctx_new();
  Broker := aBroker;
  Service := aService;
  Verbose := aVerbose;
  Heartbeat := 2500; //  msecs
  Reconnect := 2500; //  msecs
  ConnectToBroker();

end;

destructor TmdWorker.Destroy;
begin
  inherited;
  zctx_destroy(Ctx);
  Broker := '';
  Service := '';
end;

function TmdWorker.Recv(var reply: p_zmsg_t): p_zmsg_t;
//  Метод сперва отсылает брокеру ответ, а затем ждет нового запроса.
//  Название метода странное, угу.

//  Отправляет ответ брокеру, если он есть и ждет следующего запроса


var
  fCommand: p_zframe_t;
  fEmpty: p_zframe_t;
  fHeader: p_zframe_t;
  fItem: zmq_pollitem_t;
  fMsg: p_zmsg_t;
  RC: Integer;
begin
  //  Формирование и отсылка ответа (reply), если он пустой
  assert((reply <> nil) or (not ExpectReply));
  if (reply <> nil) then begin
    assert(ReplyTo <> nil);
    zmsg_wrap(reply, ReplyTo);
    SendToBroker(cMDPW_REPLY, nil, reply);
    zmsg_destroy(reply);
  end;

  ExpectReply := true;

  while (true) do begin


    zPollItemInit(fItem, sctWorker, 0, ZMQ_POLLIN, 0);
    RC := zmq_poll(@fItem, 1, Heartbeat * ZMQ_POLL_MSEC);
    if RC = -1 then
      break; //  Прерван

    if (fItem.revents and ZMQ_POLLIN) <> 0 then begin
      fMsg := zmsg_recv(sctWorker);
      if fMsg = nil then
        break; //  Прерван
      if Verbose then begin
        zclock_log('I: received message from broker:');
        zmsg_print(fMsg);
      end;

      Liveness := cHEARTBEAT_LIVENESS;

      //  Ошибки не обрабатываем, просто шумим assert-от
      assert(zmsg_size(fMsg) >= 3);

      fEmpty := zmsg_pop(fMsg);
      assert(zframe_streq(fEmpty, ''));
      zframe_destroy(fEmpty);

      fHeader := zmsg_pop(fMsg);
      assert(zframe_streq(fHeader, cMDPW_WORKER));
      zframe_destroy(fHeader);

      fCommand := zmsg_pop(fMsg);
      if (zframe_streq(fCommand, cMDPW_REQUEST)) then begin
        //  Вообще-то, мы должны извлечь и сохранить все адреса вплоть до
        // пустого, но сейчас сохранем просто один...
        ReplyTo := zmsg_unwrap(fMsg);
        zframe_destroy(fCommand);

        //  В этом месте мы должны обрабатывать сообщение
        //  Сейчас просто возвращаем его в приложение

        Result := fMsg; //  Запрос на обработку
        Exit;

      end
      else
        if (zframe_streq(fCommand, cMDPW_HEARTBEAT)) then
          // ;               //  В случае хартбита ничего не делаем
        else
          if (zframe_streq(fCommand, cMDPW_DISCONNECT)) then
            ConnectToBroker()
          else begin
            zclock_log('E: invalid input message');
            zmsg_print(fMsg);
          end;
      zframe_destroy(&fCommand);
      zmsg_destroy(&fMsg);

    end
    else
      if s_Dec(Liveness) = 0 then begin
        if Verbose then
          zclock_log('W: disconnected from broker - retrying...');
        zclock_sleep(Reconnect);
        ConnectToBroker();
      end;
    //  Отправить хратбит, если пора
    if zclock_time() > HeartbeatAt then begin
      SendToBroker(cMDPW_HEARTBEAT, nil, nil);
      HeartbeatAt := zclock_time() + Heartbeat;
    end;

  end;
  if zctx_interrupted() <> 0 then
    z_log('W: interrupt received, killing worker...'#10);
  Result := nil;
end;

procedure TmdWorker.ConnectToBroker;
//  Коннект или реконнект к брокеру
begin
  if sctWorker <> nil then // Полный дисконнект, с разрушенеим сокета
    zsocket_destroy(Ctx, sctWorker);
  sctWorker := zsocket_new(Ctx, ZMQ_DEALER);
  zmq_connect(sctWorker, PChar(Broker));
  if Verbose then
    zclock_log('I: connecting to broker at %s...', PChar(Broker));

  //  Регистрация сервиса в брокере
  SendToBroker(cMDPW_READY, PChar(Service), nil);

  //  Если число попыток (liveness) обнулилось, считаем, что брокер отключен
  Liveness := cHEARTBEAT_LIVENESS;
  HeartbeatAt := zclock_time() + heartbeat;

end;


procedure TmdWorker.SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t);
//  Отправка сообщения брокеру
//  Если сообщения нет (пусто), создать его самостоятельно
begin
  if aMsg <> nil then
    aMsg := zmsg_dup(aMsg)
  else
    aMsg := zmsg_new();

  //  Формирование конверта сообщения в сответствии с протоколом
  if aOption <> nil then
    zmsg_pushstr(aMsg, aOption);
  zmsg_pushstr(aMsg, aCommand);
  zmsg_pushstr(aMsg, cMDPW_WORKER);
  zmsg_pushstr(aMsg, '');

  if Verbose then begin
    zclock_log('I: sending %s to broker',
      mdps_commands[Byte(aCommand^)]);
    zmsg_print(aMsg);

  end;
  zmsg_send(aMsg, sctWorker);
end;


end.


Если кто-то возжелает написать и брокер в объектном стиле, напоминаю, что вторым аргументом метода zhash_freefn() должна быть ссылка на функцию типа zhash_free_fn:
  
  zhash_free_fn = procedure(data: Pointer); cdecl;
Чтобы описать такой метод в рамках класса, следует использовать модификаторы class и static
  
    class procedure service_destroy(service: Pointer); static;
Честно говоря, я бы не стал заморачиваться с zhash_lookup(), а использовал бы вместо них, например, дельфийские TStringList с включенной сортировкой.

пятница, 9 января 2015 г.

22.6 ZeroMQ: надежные схемы "Запрос/Ответ". Сервис - ориентированная надежная очередь. Шаблон "Мажордом".

(Начало - здесь)

ZeroMQ: надежные схемы "Запрос/Ответ". Сервис - ориентированная надежная очередь. Шаблон "Мажордом".


Шаблон "Мажордом". Топология.


Прогресс ускоряется, когда в нем не участвуют юристы и разные комитеты. Одно-страничная спецификация MDP сразу  превращает PPP в нечто более солидное. Именно так и должна происходить разработка сложных архитектур: начинать следует с написания соглашений, а только потом писать софт, их реализующий.
Протокол "Мажордом" (The Majordomo Protocol - MDP) расширяет и углубляет PPP в одном интересном направлении: он добавляет "имя сервиса" к запросу, который отправляет клиент, и требует от рабочих регистрироваться для оказания конкретных услуг. Добавление имен сервиса превращает брокер из шаблона "Пират-параноик" в сервис - ориентированный брокер.

четверг, 8 января 2015 г.

22.5 ZeroMQ: надежные схемы "Запрос/Ответ". Хартбитинг в деталях.

  Запрос/Ответ. Надежные схемы "Запрос/Ответ". Хартбитинг в деталях.

Хартбитинг

 


Хартбитинг решает проблему распознавания "жив партнер или нет?" Эта проблема касается не только ZeroMQ. Протокол TCP имеет долгий таймаут (30 минут и больше), что делает невозможным определить - умер ли ли партнер, был ли дисконнект, или партнер просто уехал на выходные в Прагу пить водку.
Реализовать хартбитинг не так просто. Автор оригинала примеров для шаблона "Пират-параноик" пишет, что это заняло около пяти часов. Для реализации остальной части цепочки "запрос-ответ" потребовалось от силы десять минут. Особенно легко реализуются "ложные отказы", когда, к примеру, партнеры решают, что приключился дисконнект из-за неправильного хартбитинга.

В плане использования хартбитинга совместно с ZeroMQ разработчики обычно придерживаются трех подходов:

1. "А, и так сойдет!"


вторник, 6 января 2015 г.

22.4 ZeroMQ: надежные схемы "Запрос/Ответ". Высоконадежный брокер с очередью и хартбитингом (шаблон "Пират-параноик")


Запрос/Ответ. Высоконадежный брокер с очередью и хартбитингом (шаблон "Пират - параноик").

Шаблон "Пират - параноик"

 

 

Шаблон "Простой пират" достаточно хорош особенно тем, что он является простой комбинацией двух уже существующих шаблонов. Тем не менее, и у него тоже есть некоторые недостатки:

  • Он перестает работать в случае, когда очередь (брокер) падает и перезапускается. Клиент восстановится, а рабочие - нет. Хотя ZeroMQ и выполнит автоматический реконнект после перезапуска очереди, рабочие не пошлют сигнала "ГОТОВ" и, следовательно, не будут считаться доступными. Для исправления реализуем хартбитинг от очереди к рабочим так, чтобы рабочий смог определить, что очередь стала недоступной.
  • Очередь не обнаруживает отказов рабочих, поэтому, если рабочие падают во время простоя, брокер не может удалить таких рабочих их очереди доступных рабочих до тех пор, пока брокер не не пошлет такому рабочему запрос. Клиент будет ждать и делать перезапросы в никуда. Это не очень большая проблема, но это неприятно. Чтобы все работало правильно, реализуем хартбитинг от рабочего к очереди так, чтобы очередь могла определять потерянных рабочих на любом этапе.
Решим эти проблемы проблемы с помощью шаблона "Пират - параноик".