суббота, 27 декабря 2014 г.

22.3 ZeroMQ: надежные схемы "Запрос/Ответ". Надежность на основе прокси с очередью (шаблон "Простой пират")

  Запрос/Ответ. Надежность  на базе прокси с очередью (брокер с балансировкой нагрузки). Шаблон "Простой пират".

 Шаблон "Простой  Пират"

Расширим предыдущий шаблон обеспечения надежности типа "Ленивый пират", использовав прокси с очередями. Новый шаблон позволит общаться прозрачно с несколькими серверами, которые далее будем более точно называть "рабочими".

Во всех "пиратских"  шаблонах рабочие не сохраняют свое состояние. Разработка системы обмена сообщениями подразумевает, что мы ничего не знаем о том, что приложениям могут понадобится какие-либо разделяемые ресурсы, хранящие состояние, вроде баз данных. Использование прокси с очередями подразумевает, что рабочие могут приходить и уходить, не зная ничего о клиентах. Если падает один из рабочих, другой заменяет его. Это - хорошая, простая топология с одним слабым местом: а именно брокером с очередью в центре, который может стать проблемой для системы управления и единой точкой отказа.

Топология сети:


  
Прокси с очередями построен на основе балансировщика нагрузки, рассмотренного ранее (тынц). Что в действительности нужно сделать, чтобы обработать падение или блокировку рабочего? Это удивительно, но нужно совсем немного. У нас уже есть механизм повторных запросов на клиенте. Поэтому шаблон с балансировкой нагрузки будет работать достаточно хорошо. Это соответствует философии ZeroMQ насчет того, что мы можем расширить шаблон "peer-to-peer" до шаблона типа "Запрос - Ответ"  путем подключения между ними простейшего прокси.

Нам не нужен особый клиент, мы продолжим использовать клиента из шаблона "Ленивый пират". Вот код брокера с очередью, который, в общем, идентичен главной задаче из примера брокера с балансировкой нагрузки:


  
program Reliable_SimplePirate_Broker;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  zmq_h,
  czmq_h,
  ZMQ_Utils,
  Math;

// Брокер для шаблона Simple Pirate
// Такой же, как и для схемы "брокер с балансировкой нагрузки", без механизмов
// обеспечения надежности. Надежность зависит от восстановления клиентов.
// Брокер должен работать постоянно.


const
  c_WORKER_READY: Byte = 1; // Сигнал готовности рабочего


procedure doMain();
var
  backend: Pointer;
  ctx: p_zctx_t;
  frame: p_zframe_t;
  frontend: Pointer;
  identity: p_zframe_t;
  workers: p_zlist_t;
  items: array[0..1] of zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
begin
  ctx := zctx_new();
  frontend := zsocket_new(ctx, ZMQ_ROUTER);
  backend := zsocket_new(ctx, ZMQ_ROUTER);
  zsocket_bind(frontend, 'tcp://*:5555'); // Для клиентов
  zsocket_bind(backend, 'tcp://*:5556'); // Для рабочих


// Очередь доступных рабочих
  workers := zlist_new();

// Тело данного примера такое же, и знакомые брокеры с балансировкой нагрузки
  while true do begin
    zPollItemInit(items[0], backend, 0, ZMQ_POLLIN, 0);
    zPollItemInit(items[1], frontend, 0, ZMQ_POLLIN, 0);
// Опрос фронтэнда тольк ов случае, когда есть доступные рабочие
    rc := zmq_poll(@items[0], IfThen(zlist_size(workers) > 0, 2, 1), -1);
    if rc = -1 then
      break; // Прерван

// Обработка активности рабочего в бэкенде
    if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
// Для балансировки нагрузки используем идентификацию рабочего
      msg := zmsg_recv(backend);
      if msg = nil then
        break; // Прерван
      identity := zmsg_unwrap(msg);
      zlist_append(workers, identity);

// Переправляем сообщение клиенту, если это не сигнал готовности с_READY
      frame := zmsg_first(msg);
      if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
        zmsg_destroy(msg)
      else
        zmsg_send(msg, frontend);
    end;
    if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получам запрос от клиента, отправляем его первому свободному рабочему
      msg := zmsg_recv(frontend);
      if msg <> nil then begin
        zmsg_wrap(msg, zlist_pop(workers));
        zmsg_send(msg, backend);
      end
    end
  end;
// При выходе выполняем очистку
  while zlist_size(workers) > 0 do begin
    frame := zlist_pop(workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;

begin
  doMain();
  Readln;

end.



А это - код рабочего, в которого превратился сервер "Ленивого пирата", немного адаптированный под шаблон балансировщика нагрузки (с использованием сокета REQ и сигнала "ГОТОВ"):

  
program Reliable_SimplePirate_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  , ZMQ_Utils
  , Math
  ;

// Рабочий для шаблона Simple Pirate
// Подключается сокетом REQ к tcp://*:5556
// Реализация рабочего для случая балансировки нагрузки

const
  c_WORKER_READY: Byte = 1; // Сигнал готовности рабочего


procedure doMain();
var
  ctx: p_zctx_t;
  cycles: Integer;
  frame: p_zframe_t;
  worker: Pointer;
  identity: string;
  msg: p_zmsg_t;
begin


  ctx := zctx_new();
  worker := zsocket_new(ctx, ZMQ_REQ);

// Для облегчения трассировки устанавливаем собственное произвольне значение
  Randomize();

  identity := Format('%4.0x-%4.0x', [Random($10000), Random($10000)]);
  zmq_setsockopt(worker, ZMQ_IDENTITY, PChar(identity), Length(identity));
  zsocket_connect(worker, 'tcp://localhost:5556');

// Сообщаем брокеру о готовности к работе
  z_Log(Format('I: (%s)worker ready', [identity]));
  frame := zframe_new(@c_WORKER_READY, 1);
  zframe_send(frame, worker, 0);

  cycles := 0;
  while True do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Прерван

// После нескольких циклов - имитация различных проблем
    Inc(cycles);
    if (cycles > 3) and (Random(5) = 0) then begin
      z_Log(Format('I: (%s) simulating a crash', [identity]));
      zmsg_destroy(msg);
      break;
    end
    else
      if (cycles > 3) and (Random(5) = 0) then begin
        z_Log(Format('I: (%s) simulating CPU overload', [identity]));
        sleep(2000);
        if (zctx_interrupted = 1) then
          break; // Прерван
      end;
    z_Log(Format('I: (%s)normal reply', [identity]));
    sleep(1000); // Имитация "тяжелой" работы
    zmsg_send(msg, worker);
  end;
  zctx_destroy(&ctx);
end;

begin
  doMain();
//  Readln;

end.



Для тестирования запускаем произвольное количество рабочих, клиентов "Ленивый пират" брокер, в произвольном порядке. Наблюдаем, как в итоге все рабочие падают, а клиенты пытаются достучаться до них. Брокер с очередью никогда не останавливается, и мы может запускать рабочих и клиентов в произвольном количестве.
Эта модель работает с любым числом клиентов и рабочих.

Дальше мы рассмотрим супер-надежную схему с хартбитингом (шаблон "Пират - паранок"). (Продолжение)


Комментариев нет :

Отправить комментарий