вторник, 4 ноября 2014 г.

19. ZeroMQ: Асинхронный клиент-сервер.

(Начало - здесь).

Шаблон "Асинхронный клиент/сервер".

Будем создавать архитектуру сети N-1, когда несколько разных клиентов асинхронно общаются с одним сервером.

Работать это будет вот так:

- клиенты коннектятся к серверу и отправляют запросы;
- на каждый запрос сервер отправляет 0 или больше ответов;
- клиенты могут отправлять множество запросов без ожидания ответов;
- серверы могут отправлять множество ответов без ожидания новых запросов.

Топология:





В целях упрощения тестирования, кодировать будем в рамках одного процесса, как мы уже делали несколько раз.
Т.е., множество нитей будет имитировать многопроцессную архитектуру.
При запуске примера видно, что три клиента (каждый со случайным id) выводят на консоль ответы от сервера.
Если присмотреться к результатам, выведенным на консоль, то можно заметить, что каждая задача клиента порождает ноль или больше ответов на каждый запрос.

Код приложения:

program hl_AsynchClientServer;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  ;

// Асинхронные клиент - сервер ( от DEALER к ROUTER)
//
// Так как это пример, для облегчения запуска все работает в рамках одного процесса.
// В реальности каждая задача должна быть отдельным процессом.


function client_task(args: Pointer): Pointer; cdecl;
// Задача клиента
// Клиент коннектится к серверу и шлет ему запросы по одному в секунду.
// Собирает ответы в порядке поступления а потом распечатывает.
// Несколько клиентов работаеют параллельно, каждый со своим Id.
var
  centitick: Integer;
  client: Pointer;
  ctx: p_zctx_t;
  identity: string;
  items: zmq_pollitem_t;
  msg: p_zmsg_t;
  request_nbr: Integer;
begin
  ctx := zctx_new();
  client := zsocket_new(ctx, ZMQ_DEALER);

// Случайный идентификатор (текст: для облегчения трассировки)
  identity := Format('%4x - %4x', [Random($10000), Random($10000)]);
  zsocket_set_identity(client, PChar(identity));
  zsocket_connect(client, 'tcp://localhost:5570');

  items.socket := client;
  items.fd := 0;
  items.events := ZMQ_POLLIN;
  items.revents := 0;
  request_nbr := 0;
  while true do begin
// Тики по одному в секунду: получаем приходящие сообщения
    for centitick := 0 to 99 do begin
      zmq_poll(@items, 1, 10 * ZMQ_POLL_MSEC);
        // Опрос с таймаутом 0,01 сек
      if (items.revents and ZMQ_POLLIN) <> 0 then begin // Что-то есть
        msg := zmsg_recv(client);
        zframe_print(zmsg_last(msg), PChar(identity));
        zmsg_destroy(msg);
      end;
    end;
    Inc(request_nbr);
    zstr_sendf(client, PChar('request # %d'), request_nbr);
  end;
  zctx_destroy(ctx);
  result := nil;
end;

// Задача сервера
// Используется многонитевая модель. Для вытягивания запросов в пул
// рабочих и распределения ответов обратно клиентам. Один рабочий может
// одновременно обработать один запрос, но один клиент может общаться с
// множеством рабочих одновременно.

procedure server_worker(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl; forward;

function server_task(args: Pointer): Pointer;
var
  backend: Pointer;
  ctx: p_zctx_t;
  frontend: Pointer;
  thread_nbr: Integer;
begin
// Фронтенд сокет общается с клиентами по tcp
  ctx := zctx_new();
  frontend := zsocket_new(ctx, ZMQ_ROUTER);
  zsocket_bind(frontend, 'tcp://*:5570');

// Бэкенд сокет общается с рабочими по inproc
  backend := zsocket_new(ctx, ZMQ_DEALER);
  zsocket_bind(backend, 'inproc://backend');

// Запуск пула рабочих нитей; точное количество не важно.
  for thread_nbr := 0 to 4 do
    zthread_fork(ctx, @server_worker, nil);

// Коннект бэкэнда к фронтэнду через прокси
  zmq_proxy(frontend, backend, nil);

  zctx_destroy(ctx);
  result := nil;

end;


procedure server_worker(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
// Каждая задача рабочего работает одновременно над одним запросом и
// отправляет случайное число ответов со случайными паузами между ответами:
var
  content: p_zframe_t;
  identity: p_zframe_t;
  msg: p_zmsg_t;
  replies: Integer;
  reply: Integer;
  worker: Pointer;
begin
  worker := zsocket_new(ctx, ZMQ_DEALER);
  zsocket_connect(worker, 'inproc://backend');

  while true do begin
// Сокет DEALER дает нам конверт ответа и мообщение
    msg := zmsg_recv(worker);
    identity := zmsg_pop(msg);
    content := zmsg_pop(msg);
    assert(content <> nil);
    zmsg_destroy(msg);

// Отправка обратно 0..4 ответов
    replies := Random(5);
    for reply := 0 to pred(replies) do begin
// Sleep какое-то случайное время
      zclock_sleep(Random(1000) + 1);
      zframe_send(identity, worker, c_ZFRAME_REUSE + c_ZFRAME_MORE);
      zframe_send(content, worker, c_ZFRAME_REUSE);
    end;
    zframe_destroy(identity);
    zframe_destroy(content);
  end;
end;


procedure DoMain;
// Главная нить просто запускает 5 клиентов и 1 сервер и ждет, когда
// сервер завершится.
begin
  Randomize;
  zthread_new(@client_task, nil);
  zthread_new(@client_task, nil);
  zthread_new(@client_task, nil);
  zthread_new(@server_task, nil);
  zclock_sleep(5 * 1000);
    // Работаем 5 секунд, потом завершение
end;

begin 
//При создании многонитевого Delphi приложения обязательно задавать  
// значение True для переменной System.IsMultyThread. 
 IsMultyThread := True;
 DoMain;
  Readln; // Для отладки
end.


Некоторые замечания к коду примера.

Клиенты шлют запросы раз в секунду и получают обратно ноль или несколько ответов. Чтобы такое сделать с помощью zmq_poll(), мы не может просто опрашивать с 1-секундным таймаутом, или мы завершим отправку нового запроса только через 1 секунду после того, как мы примем последний ответ. Поэтому мы опрашиваем с высокой частотой (100 раз в секунду, по 1/100 секунде на опрос ), что можно считать достаточно точным.

Сервер использует пул рабочих нитей, и каждая из них обрабатывает один запрос синхронно. Он соединяет их со своими фронтэнд сокетами с помощью внутренней очереди. Соединение фронтэнд и бэкэнд сокетов выполняется с помощью вызова zmq_proxy().

Таким образом, более детальная структура асинхронного сервера такова:



Стоит подчеркнуть, что для диалога между клиентами и сервером используются сокеты DEALER -> ROUTER, в то время как внутри сервера, для общения между главной нитью сервера и рабочими используются сокеты DEALER -> DEALER. Если бы сокеты были строго синхронными, то мы бы использовали сокет REPORT. Однако, так как мы хотим отправлять множество ответов, нам нужен асинхронный сокет. Мы не хотим маршрутизировать ответы, так как они всегда идут к одной серверной нити, который слал нам запрос.


Посмотрим на маршрутизацию конверта сообщения. Клиент посылает сообщение, содержащие один кадр. Нить сервера принимает сообщение из двух кадров: исходное сообщение с префиксом из идентификатора клиента. Мы отправляем эти два кадра рабочему, который рассматривает его как обычный конверт ответа и возвращает его нам как двухкадровое сообщение. Затем мы первый кадр используем как идентификатор для маршрутизации второго кадра при отправке ответа клиенту в качестве ответа.

Схема примерно такая:
Client
Server
frontend
Worker
[ DEALER ]<----->[ ROUTER <----->DEALER<----->DEALER ]

1 часть
2 части
2 части



Теперь насчет сокетов. Для реализации балансировщика нагрузуки для общения с рабочими мы могли бы использовать схему ROUTER -> DEALER, но это повлекло бы много дополнительной работы. В данном случае схема DEALER -> DEALER очевидно предпочтительнее, обеспечивая компромисс между низкой латентностью для каждого запроса и повышенным риском разбалансировки нагрузки рабочих. В данном случае было сделано "как проще".

При построении серверов, которые сохраняют состояние клиентов, возникают классические проблемы. Если сервер хранит состояние каждого клиента, а клиентов подключается все больше и больше, в конце концов наступает момент исчерпания ресурсов. Даже если одни и те же клиенты сохраняют коннект, и если вы используете идентификацию по умолчанию, то каждый соединение будет выглядеть как новое.
В примере выше мы немного сжульничали, сохраняя состояние только в течении короткого времени (время, необходимое работнику для обработки запроса), а затем сбрасывая состояние.
Но во многих случаев это непрактично.

Для правильного управления состоянием клиента в асинхронном сервере, необходимо:


  • От клиента к серверу следует реализовать heartbeating. В нашем примере, мы шлем запросы по одному в секунду, и это достоверно можно считать хартбитингом.
  • Сохраняйте состояние клиента, используя идентификатор (сгенерированный автоматически или как-либо иначе) в качестве ключа.
  • Следите и реагируйте на остановку хартбита. Если от клиента не приходо запроса, скажем, в течении двух секунд, сервер может это обнаружить и уничтожить все состояния, связанные с данным клиентом.

Хартбит - heartbeat - это периодический сигнал, генерируемый аппаратно или программно, предназначенный для определения нормального функционирования системы либо для синхронизации других компонентов системы. Обычно хартбит - это посылка между машинами с регулярными интервалами времени порядка секунды. Если хартбит не принимается в течении какого-то вреиени - обычно в течении нескольких интервалов хартбита - то считается, что машина, которая должна посылать хартбит, неисправна.

Пример: Система "Периметр".


Дальше будет рассмотрена задача межброкерной маршрутизации: (Продолжение).

3 комментария :

  1. Всем привет!
    Возможно ли сделать асинхронную работу сервера без создания нити на каждого клиента? т.е. работать чисто по сообщениям в одной нити.

    ОтветитьУдалить
    Ответы
    1. судя по этому посту http://stackoverflow.com/questions/27069359/does-zeromq-have-a-notification-callback-event-message-for-when-data-arrives этим вопросом задаются и другие, и ответ, по всей видимости, отрицательный

      Удалить
    2. Если каждый клиентский запрос не будет требовать настолько долгой обработки сервером, чтобы переполнить очередь входных сообщений - почему бы и нет?
      Вынесение обработки сообщений в отдельную (возможно, единственную) нить полезно, когда нужно, чтобы сервер оперативно откликался на посторонние запросы. Например, отрисовывал бы графический интерфейс.

      Удалить