вторник, 6 января 2015 г.

22.4 ZeroMQ: надежные схемы "Запрос/Ответ". Высоконадежный брокер с очередью и хартбитингом (шаблон "Пират-параноик")


Запрос/Ответ. Высоконадежный брокер с очередью и хартбитингом (шаблон "Пират - параноик").

Шаблон "Пират - параноик"

 

 

Шаблон "Простой пират" достаточно хорош особенно тем, что он является простой комбинацией двух уже существующих шаблонов. Тем не менее, и у него тоже есть некоторые недостатки:

  • Он перестает работать в случае, когда очередь (брокер) падает и перезапускается. Клиент восстановится, а рабочие - нет. Хотя ZeroMQ и выполнит автоматический реконнект после перезапуска очереди, рабочие не пошлют сигнала "ГОТОВ" и, следовательно, не будут считаться доступными. Для исправления реализуем хартбитинг от очереди к рабочим так, чтобы рабочий смог определить, что очередь стала недоступной.
  • Очередь не обнаруживает отказов рабочих, поэтому, если рабочие падают во время простоя, брокер не может удалить таких рабочих их очереди доступных рабочих до тех пор, пока брокер не не пошлет такому рабочему запрос. Клиент будет ждать и делать перезапросы в никуда. Это не очень большая проблема, но это неприятно. Чтобы все работало правильно, реализуем хартбитинг от рабочего к очереди так, чтобы очередь могла определять потерянных рабочих на любом этапе.
Решим эти проблемы проблемы с помощью шаблона "Пират - параноик".



 Топология сети для схемы "Пират - параноик":





Ранее для рабочего мы использовали сокет REQ. Для реализации шаблона "Пират - параноик" мы перейдем на сокет DEALER. Это даст возможность отправлять и принимать сообщения в любые моменты времени, а не просто в последовательности "запрос/ответ", как того требует жесткая архитектура сокета REQ. Недостатком сокета DEALER является необходимость ручного управления построением структуры конверта сообщения. (См. Схема "Издатель - Подписчик", подробности, структура конверта).

Мы продолжаем использовать клиента из шаблона "Ленивый пират".



Код очереди (брокера) для шаблона "Пират - параноик":

  
program Reliable_ParanoidPirate_Broker;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  , zmq_utils
  , Math
  ;

// Очередь (брокер) для шаблона "Пират - параноик"
const
  c_HEARTBEAT_LIVENESS = 3; // Обычно 3-5
  c_HEARTBEAT_INTERVAL = 1000; // msecs

// Константы для протокола "Пират- параноик"
  c_PPP_READY: Byte = 1; // Сигнал готовности рабочего
  c_PPP_HEARTBEAT: Byte = 2; // Сигнал хартбитинга рабочего

type
  TWorker = class
    // Класс, описывающий рабочего.
    public
    identity: p_zframe_t; // Идентификатор рабочего
    id_string: PChar; // Строковое представление идентификатора
    expiry: int64_t; // Момент смерти
    procedure Ready(workers: p_zlist_t);
    constructor Create(a_identity: p_zframe_t);
    destructor Destroy; override;
  end;

// Метод s_workers_next возвращает идентификатор следующего доступного рабочего:
function
  s_workers_next(workers: p_zlist_t): p_zframe_t;
var
  worker: TWorker;
begin
  worker := zlist_pop(workers);
  assert(worker <> nil);
  Result := worker.identity;
  worker.identity := nil;
  worker.Free;
end;


procedure s_workers_purge(workers: p_zlist_t);
// Метод s_workers_purge находит и уничтожает все объекты мертвых рабочих.
// Обрабатка идет от старейших к недавно использованным. Таким образом, мы
// останавливаемся на первом доступном рабочем:
var
  worker: TWorker;
begin
  worker := zlist_first(workers);
  while worker <> nil do begin
    if zclock_time() < worker.expiry then
      break; // Рабочий жив, используем

    zlist_remove(workers, worker);
    worker.Free;
    worker := zlist_first(workers);
  end;
end;


procedure doMain();
// Основаня задача представляет собой балансировщик нагрузки с
// харбитингом к рабочим. Таким образом, мы можем обнаружить упавшие
// или зависшие задачи рабочих:
var
  backend: Pointer;
  ctx: p_zctx_t;
  frame: p_zframe_t;
  frontend: Pointer;
  heartbeat_at: uint64_t;
  identity: p_zframe_t;
  items: array[0..1] of zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  worker: TWorker;
  workers: p_zlist_t;
begin
  ctx := zctx_new();
  frontend := zsocket_new(ctx, ZMQ_ROUTER);
  backend := zsocket_new(ctx, ZMQ_ROUTER);
  zsocket_bind(frontend, 'tcp://*:5555'); // Для клиентов
  zsocket_bind(backend, 'tcp://*:5556'); // Для рабочих

// Список доступных рабочих
  workers := zlist_new();

// Посылаем хартбиты через одинаковые интервалы времени
  heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;

  while True do begin
    zPollItemInit(items[0], backend, 0, ZMQ_POLLIN, 0);
    zPollItemInit(items[1], frontend, 0, ZMQ_POLLIN, 0);

// Опрашиваем фронтенд только тогда, когда есть доступные рабочие
    rc := zmq_poll(@items[0], IfThen(zlist_size(workers) > 0, 2, 1),
      c_HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; // Прерван

// Обработка активности рабочена на бэкэнде
    if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
// Используем идентификацию рабочего для балансировщика нагрузки
      msg := zmsg_recv(backend);
      if msg = nil then
        break; // Прерван

// Любое проявление активноси рабочего подразумевает его готовность
      identity := zmsg_unwrap(msg);
      worker := TWorker.Create(identity);
      worker.Ready(workers);

// Валидация управляющего сообщения, или возврат ответа клиенту
      if zmsg_size(msg) = 1 then begin
        frame := zmsg_first(msg);
        if not (CompareMem(zframe_data(frame), @c_PPP_READY, 1)
          or CompareMem(zframe_data(frame), @c_PPP_HEARTBEAT, 1)) then begin
          z_Log('E: invalid message from worker');
          zmsg_print(msg);
        end;
        zmsg_destroy(msg);
      end
      else
        zmsg_send(msg, frontend);
    end;
    if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получаем следующий запрос клиента и распределяем его следующему рабочему
      msg := zmsg_recv(frontend);
      if msg = nil then
        break; // Прерван
      zmsg_push(msg, s_workers_next(workers));
      zmsg_send(msg, backend);
    end;
// Обрабатываем хартбитинг после любой активности сокета. Сначала отправляем
// хартбиты всем простаивающим в данный момент времени рабочим. Затем очищаемся
// от мертвых рабочих:
    if (zclock_time() >= heartbeat_at) then begin
      worker := zlist_first(workers);
      while worker <> nil do begin
        // Строим пакет сообщения. ZFRAME_REUSE - без разрушения фрейма
        zframe_send(worker.identity, backend, c_ZFRAME_REUSE + c_ZFRAME_MORE);
        frame := zframe_new(@c_PPP_HEARTBEAT, 1);
        zframe_send(frame, backend, 0);
        worker := zlist_next(workers);
      end;
      // Следующий хартбит
      heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;
    end;
    s_workers_purge(workers); // Сбрасываем мертвых рабочих
  end; // while

// Завершение работы, очистка.
  while zlist_size(workers) > 0 do begin
    worker := zlist_pop(workers);
    worker.Free;
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;

constructor TWorker.Create(a_identity: p_zframe_t);
begin
// Создание объекта рабочего
  identity := a_identity;
  id_string := zframe_strhex(identity);
  expiry := zclock_time()
    + c_HEARTBEAT_INTERVAL * c_HEARTBEAT_LIVENESS;
end;

destructor TWorker.Destroy;
begin
// Разрушение объекта рабочего, в том числе идентифицирущего фрейма
// и строкового идентификатора.
// Разрушаем с помощью методов czmq, так как эти объекты создавались
// в недельфийском диспетчере памяти
    zframe_destroy(identity);
    zstr_free(id_string);
  inherited;
end;


procedure TWorker.Ready(workers: p_zlist_t);
var
  worker: TWorker;
begin
// Метод ready помещает  рабочего в конец списка готовых рабочих:
  worker := zlist_first(workers);
  while worker <> nil do begin
    if string(id_string) = string(worker.id_string) then begin
      zlist_remove(workers, worker);
      worker.Free;
      break;
    end;
    worker := zlist_next(workers);
  end;
  zlist_append(workers, self);
end;

begin
  IsMultiThread := True;
  doMain();
  readln;
end.

Очередь представляет собой усложненный шаблон балансировки нагрузки с хартбитингом рабочих. Хартбитинг - не самая простая для понимания вещь. Однако, все станет совершенно ясно через пару секунд.

Код рабочего для шаблона "Пират - параноик":

 
program Reliable_ParanoidPirate_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  , zmq_utils
  , Math
  ;



// Рабочий для шаблона "Пират - параноик"

const
  c_HEARTBEAT_LIVENESS = 3; // Нормально, если 3-5
  c_HEARTBEAT_INTERVAL = 1000; // msecs
  c_INTERVAL_INIT = 1000; // Начальный интервал реконнекта
  c_INTERVAL_MAX = 32000; // После экпоненциальной отсрочки

// Константы протокола "Пират - параноик"
  c_PPP_READY: Byte = 1; // Сигнал готовности рабочего
  c_PPP_HEARTBEAT: Byte = 2; // Сигнал хартбитинга рабочего

// Вспомогательная функция, возвращающая заново настроенный сокет,
// подключенный к очереди шаблоона "Параллельный пират"

function
  s_worker_socket(ctx: p_zctx_t): pointer;
var
  frame: p_zframe_t;
begin
  Result := zsocket_new(ctx, ZMQ_DEALER);
  zsocket_connect(Result, 'tcp://localhost:5556');

// Сообщаем очереди о готовности к работе
  z_Log('I: worker ready');
  frame := zframe_new(@c_PPP_READY, 1);
  zframe_send(frame, Result, 0);
end;



procedure doMain;
var
  ctx: p_zctx_t;
  cycles: Integer;
  frame: p_zframe_t;
  heartbeat_at: uint64_t;
  interval: size_t;
  items: zmq_pollitem_t;
  liveness: size_t;
  msg: p_zmsg_t;
  rc: Integer;
  worker: pointer;
begin

// Протокол "Пират - параноик" (Paranoid Pirate Protocol - PPP) со стороны
// рабочего выполнен в виде одной задачи. В данном случае особенно интересна
// реализация хартбитинга, которая позволяет рабочему определять доступность
// очереди (и наоборот)

  ctx := zctx_new();
  worker := s_worker_socket(ctx);

// Если индикатор живучести liveness обнуляется, считаем, что связь
// с очередью прервалась
  liveness := c_HEARTBEAT_LIVENESS;
  interval := c_INTERVAL_INIT;

// Отсылает хартбиты через равные промежутки времени
  heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;

  Randomize();
  cycles := 0;
  while true do begin
    zPollItemInit(items, worker, 0, ZMQ_POLLIN, 0);
    rc := zmq_poll(@items, 1, c_HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; // Прерван

    if (items.revents and ZMQ_POLLIN) <> 0 then begin
// Получаем сообщение. Если оно состоит из
// - 3х частей (конверт + содержимое) -> это запрос
// - 1й части (хартбит) -> это хартбит
      msg := zmsg_recv(worker);
      if msg = nil then
        break; // Прерван

// Тестируем живучесть реализации очереди. Для этого имитируем различные
// проблемы: падение рабочего или подвисание рабочего.
// Делаем это после выполнения нескольких циклов таки образом, чтобы система
// изначально смогла запуститься:
      if zmsg_size(msg) = 3 then begin
        Inc(cycles);
        if (cycles > 3) and (Random(5) = 0) then begin
          z_Log('I: simulating a crash');
          zmsg_destroy(msg);
          break;
        end
        else
          if (cycles > 3) and (Random(5) = 0) then begin
            z_Log('I: simulating CPU overload');
            sleep(3000);
            if zctx_interrupted = 1 then
              break;
          end;
        z_Log('I: normal reply');
        zmsg_send(msg, worker);
        liveness := c_HEARTBEAT_LIVENESS;
        sleep(1000); // Do some heavy work
        if zctx_interrupted = 1 then
          break;
      end
      else
// При получения от очереди сообщения харбитинга считаем, что очередь
// (была) жива, поэтому сбрасываем индикатор живучести liveness:
        if zmsg_size(msg) = 1 then begin
          frame := zmsg_first(msg);
          if CompareMem(zframe_data(frame), @c_PPP_HEARTBEAT, 1) then
            liveness := c_HEARTBEAT_LIVENESS
          else begin
            z_Log('E: invalid message');
            zmsg_print(msg);
          end;
          zmsg_destroy(&msg);
        end
        else begin
          Z_log('E: invalid message');
          zmsg_print(msg);
        end;
      interval := c_INTERVAL_INIT;
    end
    else begin
// Если очередь так и не прислала хартбитинг, уничтожаем сокет и реконнектимся.
// Вот такой простой и суровый способ отбросить все отправленные сообщения:
      Dec(liveness);
      if liveness = 0 then begin
        Z_log('W: heartbeat failure, can''t reach queue');
        Z_log(Format('W: reconnecting in %ud msec...', [interval]));
        zclock_sleep(interval);

        if (interval < c_INTERVAL_MAX) then
          interval := interval * 2;
        zsocket_destroy(ctx, worker);
        worker := s_worker_socket(ctx);
        liveness := c_HEARTBEAT_LIVENESS;
      end;
    end;
// Отправить хартбит в очередь, если пришло время
    if (zclock_time() > heartbeat_at) then begin
      heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;
      Z_log('I: worker heartbeat');
      frame := zframe_new(@c_PPP_HEARTBEAT, 1);
      zframe_send(frame, worker, 0);
    end
  end;
  zctx_destroy(ctx);

end;

begin
  doMain;
  Readln;
end.



Комментарии к примеру:
  • Как и раньше, в коде присутствует имитация отказов. Это делает код довольно сложным для отладки и опасным для повторного использования. Когда возникнет надобность в отладке такого кода, следует отключить имитацию отказов.
  • Рабочий использует стратегию реконнекта, похожую на ту, которая используется в клиента шаблона "Ленивый пират", но с двумя важными отличиями: (a) реконнект выполняется через экспоненциально возрастающие промежутки времени, и (b) он выполняет его бесконечно (тогда как клиент после сбоя делает это фиксированное число раз).
Запускам клиента, очередь и рабочих. Например, с помощью командного файла, содержащего скрипт вроде этого:
 
start Reliable_ParanoidPirate_Broker.exe
for /l %%i in (1,1,5) do start Reliable_ParanoidPirate_Worker.exe
timeout /t 1
start Reliable_LazyPirate_Client.exe

В первой строке запускается очередь (брокера) Reliable_ParanoidPirate_Broker.exe.
Во второй строке запускается пять экземпляров рабочих Reliable_ParanoidPirate_Worker.exe.
В третьей строке задается пауза в 1 секунду для того, чтобы рабочие подключились к очереди.
В четвертой строке запускается клиент. 


После запуска мы наблюдаем рабочих, имитирующих отказы (зависания и падения), и клиента, который в итоге отвалится, когда не останется доступных рабочих. Мы можем остановить и перезапустить очередь, при этом клиент рабочие выполнят реконнект и подхватят работу. И неважно, что делается с очередью и с рабочими: клиент никогда не отвалится, так как работает либо вся цепочка, либо клиент уходит в отказ.


Далее мы рассмотрим харбитинг более детально. (Продолжение)

Комментариев нет :

Отправить комментарий