четверг, 25 декабря 2014 г.

22.2 ZeroMQ: надежные схемы "Запрос/Ответ". Надежность на стороне клиента (шаблон "Ленивый пират")

  Запрос/Ответ. Надежность на стороне клиента. Шаблон "Ленивый пират".

 Шаблон "Ленивый  Пират"

 

 

Топология сети:




Небольшие изменения на стороне клиента позволяют получить надежную схему "Запрос-Ответ".
Вместо того, чтобы выполнять блокирующее чтение (receive) из сокета, поступаем иначе:



  • Выполняем поллинг сокета REQ и читаем из него данные только тогда, когда они действительно присутствуют.
  • Повторно отправляем запрос, если не было ответа в течение заданного времени.
  • Отменяем транзакцию, если ответа не было получено в течении нескольких запросов.
Если попытаться использовать сокет REQ как-то иначе, чем строго в режиме последовательных  операций Запрос/Ответ, будет сформирована ошибка (так как технически сокет REQ реализован как простой конечный автомат, выполняющий запрос/ответ, а код ошибки такой операции называется "EFSM"). Это слегка напрягает, когда нужно использовать сокет REQ в пиратском шаблоне, так как мы можем захотеть отправить несколько запросов перед тем, как запросим ответ.
После ошибки выполняем операцию закрытия и повторного открытия сокета REQ.

Шаблон "Ленивый пират", код клиента:

  
program Reliable_LazyPirate_Client;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  , zmq_utils
  ;

// Клиент Lazy Pirate
// Для безопасной схемы запрос/ответ используется zmq_poll
// Для проверкаи работы следует запустить Lazy Pirate сервер,
// а потом произвольно закрывать и перезапускать его

const
  c_REQUEST_TIMEOUT = 2500; // msecs, (> 1000!)
  c_REQUEST_RETRIES = 3; // Число попыток, потом - "алярам"
  c_SERVER_ENDPOINT = 'tcp://localhost:5555';


procedure doMain();
var
  client: Pointer;
  ctx: p_zctx_t;
  expect_reply: Integer;
  pollitem: zmq_pollitem_t;
  rc: Integer;
  reply: PChar;
  request: string;
  retries_left: Integer;
  sequence: Integer;
begin

  ctx := zctx_new();
  z_log('I: connecting to server...');
  client := zsocket_new(ctx, ZMQ_REQ);
  assert(client <> nil);
  zsocket_connect(client, c_SERVER_ENDPOINT);

  sequence := 0;
  retries_left := c_REQUEST_RETRIES;
  while (retries_left > 0) and (zctx_interrupted = 0) do begin
// Отсылаем запрос, затем рабоаем над получением ответа
    Inc(sequence);
    request := IntToStr(sequence);
    zstr_send(client, PChar(request));

    expect_reply := 1;
    while expect_reply <> 0 do begin
// Опрос сокета на наличе ответа, с таймаутом
      zPollItemInit(pollitem, client, 0, ZMQ_POLLIN, 0);
      rc := zmq_poll(@pollitem, 1, c_REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
      if rc = -1 then
        break; // Прерван

// Обработка ответа сервер выход из цикла, если ответ валидный
// Если ответ не получен, закрываем сокет client и отправляем запрос заново.
// Так делаем несколько раз, пока не поймем, что все ёбнулось

      if (pollitem.revents and ZMQ_POLLIN) <> 0 then begin
// Получаем ответ  от сервера, должен совпадать со значением в sequence
        reply := zstr_recv(client);
        if reply = nil then
          break; // Прерван
        if StrToInt(reply) = sequence then begin
          z_log('I: server replied OK' + reply);
          retries_left := c_REQUEST_RETRIES;
          expect_reply := 0;
        end
        else
          z_Log('E: malformed reply from server: ' + reply);

        zstr_free(reply);
      end
      else if retries_left = 1 then begin
        Dec(retries_left);
        z_Log('E: server seems to be offline, abandoning');
        break;
      end
      else begin
        Dec(retries_left);
        z_Log('W: no response from server, retrying...');
// Отказ старого сокета. Закрываем его и открываем новый
        zsocket_destroy(ctx, client);
        z_Log('I: reconnecting to server...');
        client := zsocket_new(ctx, ZMQ_REQ);
        zsocket_connect(client, c_SERVER_ENDPOINT);
// На новый сокет отправляем запрос повторно
        zstr_send(client, PChar(request));
      end
    end
  end;
  zctx_destroy(&ctx);
end;

begin
  doMain();
  Readln;
end.



Шаблон Ленивый пират, код сервера:

  

program Reliable_LazyPirate_Server;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  , zmq_utils
  ;

// Сервер Lazy Pirate
// Подвязывается к сокету REQ на tcp://*:5555
// Точно такой же, как и серве HelloWorld, за исключением:
// - эхо-ответы возвращает в том же виде, в каком были получены запросы;
// - случайным образом подтормаживает или завершается, имитируя падение.


procedure doMain();
var
  context: Pointer;
  cycles: Integer;
  request: String;
  server: Pointer;
begin


  Randomize;

  context := zmq_ctx_new();
  server := zmq_socket(context, ZMQ_REP);
  zmq_bind(server, 'tcp://*:5555');
  cycles := 0;
  while true do begin

    request := s_recv(server);
    Inc(cycles);

// После нескольких циклов - имитация различных пробем
    if (cycles > 3) and (Random(3) = 0) then begin
      z_Log('I: simulating a crash');
      break;
    end
    else if (cycles > 3) and (Random(3) = 0) then begin
      z_Log('I: simulating CPU overload');
      sleep(2000);
    end;
    z_Log(Format('I: normal request (%s)', [request]));
    sleep(1000); // Имитация выполнения какой-то работы
    s_send(server, request);
  end;
  zmq_close(server);
  zmq_ctx_destroy(context);


end;

begin
  doMain();
  Readln;
end.




Запускаем оба приложения в отдельных консолях, наблюдаем примерно следующее:
Вывод сервера:




Вывод клиента:



Клиент последовательно отправляет каждое сообщение и проверяет, что ответы вернулись в том же же самом порядке, и это означает, что запросы или ответы не потерялись, и что ответов не пришло больше, чем один, или ответ пришел не в том порядке.
Чтобы убедиться в правильно работе механизма, следует запустить приложения подряд несколько раз. В реальном приложении нет нужды нумеровать сообщения, это нужно только для того, чтобы убедиться, что все работает так, как нужно.
Клиент использует сокет REQ, и, если нужно, грубо перезапускает его методом close/reopen, так как сокеты REQ работают в жестком цикле запрос-ответ. Можно вместо этого использовать сокет DEALER, но это было бы не очень хорошо. Во-первых, понадобилось бы вспомнить о том, что делает сокет REQ  с конвертом сообщения (если мы забыли об этом - это хороший сигнал, значит, мы не хотим этим заниматься). Во-вторых, это бы означало, что мы потенциально готовы получить тот ответ, которого не ожидаем.
Обработка отказов исключительно на клиентской стороне работает, когда несколько клиентов общаются с одним сервером. Обработка может пережить крах сервера, но только в случае, когда под восстановлением подразумевается перезапуск того же самого сервера. Если происходит долговременный  отказ вроде пропадания питания на сервере, такой подход перестает работать. Так как код серверного приложения обычно является величайшим источником отказов при любой архитектуре, зависимость от одного сервера является не очень хорошей идеей.

Итак, плюсы и минусы шаблона "Ленивый пират":
  • Плюс: прост для понимания и реализации.
  • Плюс: легко работает с существующими кодом приложений клиента и сервера.
  • Плюс: в процессе работы ZeroMQ автоматически реконнетится и повторно отправляет запросы.
  • Минус: не переходит на резервные или алтернативные серверы в случае отказа.

Дальше мы рассмотрим основы построения надежной схемы "запрос-ответ" с использованием брокера с очередью (шаблон "Простой пират"). (Продолжение)


Комментариев нет :

Отправить комментарий