понедельник, 3 ноября 2014 г.

15. ZeroMQ: реализация шаблона "Балансировка нагрузки".

(Начало здесь)

Шаблон "Балансировка Нагрузки"


Шаблон "Балансировка Нагрузки" чрезвычайно популярен. Он решает главную проблему, когда простые алгоритмы последовательной круговой(round robin) маршртизации (которые обеспечивают PUSH и DEALER) становятся неэффективными. Такое случается, когда для выполнения задач, решаемых рабочими процессами, требуется разное время.

Попробуем провести аналогию с почтовым офисом. Пусть у нас есть общая очередь клиентов перед прилавком, и некоторые покупают почтовые марки (быстрая, простая транзакция), а некоторые открывают новые счета (очень медленная, долгая транзакция). Становися очевидно, что покупатели почтовых марок застряли неоправданно надолго. Если ваша архитектура обмена сообщениями окажется неудачной, ваши клиенты будут раздражены, точно так же, как в почтовом офисе.

Для почтового офиса решением будет использование одной очереди так, чтобы даже когда один или два служащих зависнут на медленном задании, прочие клиенты будут обслуживаться другими служащими по принципу "первый пришел - первый обслужен".



Одной из причин использования упрощенного подхода в сокетах PUSH и DEALER является производительность. Если вы прибываете в любой из главных аэропортов США, вы увидите несколько длинных очередей людей, из ожидающих оформления процедуры иммиграции. Пограничники заранее отправляют людей к очереди конкретного служащего, а не заставляют всех стоять в одной общей очереди. Заставив людей заранее пройти полсотни метров, реально экономится 1-2 минут на обслуживание каждого пассажира. А так как проверка каждого паспорт занимает примерно одинаковое время, это обеспечивает что-то вроде справедливого обслуживания. Это и есть стратегия для сокетов PUSH и DEALER: распределить нагрузку заранее так, чтобы уменьшить расстояние для перемещения сообщения.

То есть, метод обслуживания в аэропорту отличается от метода в почтовом офисе.


Рассмотрим сценарий, когда рабочий процесс (сокетом DEALER или REQ) подключается к брокеру (к сокету ROUTER). Брокер знает, когда рабочий процесс готов к обслуживанию, и хранит список рабочих процессов. Поэтому он может всегда определить, какой из рабочих процессов наиболее редко использовался.

Решение задачи очень просто: рабочий посылает сообщение "готов" после того, как стартует, а также всякий раз после выполнения очередной задачи. Брокер читает сообщения последовательно, одно за другим. Понятно, что всякий раз, когда сообщение прочитано, оно принято от рабочего процесса, использованного последним . А так как используется сокет ROUTER - значит, в начале конверта сообщения содержится идентификатор, который позволяет оправить задачу рабочему процессу обратно.

Цикл запрос - ответ необходим, так как задание отправляется с ответом, а любой ответ на задание отправляется как новый запрос.

Следующий пример понятно демонстрирует описанное:

Основная нить приложения создает "слушающий" сокет ROUTER. Затем создается 10 рабочих нитей, каждая нить создает сокет REQ и коннектится к сокету ROUTER основной нити. Каждая рабочая нить в цикле посылает сообщение "Я готов!", потом получает задание и выполняет его, пока не получит сообщение "Свободен!".
Основная нить  в цикле читает конверт сообщения из сокета ROUTER. Первый кадр сообщения - идентификатор нити, которая его прислала. Значит, эта нить готова к выполнению работы, формируется обратный конверт и сообщение отправляется обратно.
Если задача выполнялась больше 5 секунд, основная нить отправляет сообщение "Fire!".


Код приложения, поясняющего идею балансировки нагрузки

program RouterToReq;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h, ZMQ_Utils, Windows;

const
  C_NMBR_WORKERS = 10;

procedure worker_task(args: Pointer); // Процедура рабочей нити
var
  fContext: Pointer;
  fSocketWorker: Pointer;
  fTotal: Integer;
  fWorkload: Utf8String;
begin
  fContext := zmq_ctx_new();
  fSocketWorker := zmq_socket(fContext, ZMQ_REQ);
  // Устанавливает случайное текстовое значение идентификатора для сокета
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, 'tcp://localhost:5671');

  fTotal := 0;
  while true do
  begin
// Сообщаем брокеру, что нить готова к работе
    s_send(fSocketWorker, 'Hi Boss');

// Получаем рабочее задание от брокера, пока не будет команды на прекращение
    fWorkload := s_recv(fSocketWorker);
    if fWorkload = 'Fired!' then
    begin
      z_Log(Format('Completed: %d tasks', [fTotal]));
      break;
    end;

    sleep(random(500) + 1); // Выполнение какой-то "полезной" работы
    Inc(fTotal);
  end;
  zmq_close(fSocketWorker);
  zmq_ctx_destroy(fContext);
end;

var
  fContext: Pointer;
  fSocketBroker: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fFrequency: Int64;
  fStart: Int64;
  fStop: Int64;
  fDT: Int64;
  fWrkThreadsFired: Integer;
  fStrIdentity: string;
  fDummy: string;
begin
  fContext := zmq_ctx_new();
  fSocketBroker := zmq_socket(fContext, ZMQ_ROUTER);

  zmq_bind(fSocketBroker, 'tcp://*:5671');
  Randomize;

  // Запускаем пять рабочих нитей
  for i := 0 to Pred(C_NMBR_WORKERS) do
    BeginThread(nil, 0, @worker_task, nil, 0, fThrId);

 // Засекаем время
  QueryPerformanceFrequency(fFrequency);
  QueryPerformanceCounter(fStart);

// В течении пяти секунд шлем задания, а затем посылаем сообщение, чтобы остановились
  fWrkThreadsFired := 0;
  while true do
  begin

  // Следующее сообщение возвращает нить, первый выполнивший задание
    fStrIdentity := s_recv(fSocketBroker); // Кадр с идентификатором (фактически - обратный адрес)

    // Формируем конверт для ответа
    s_send(fSocketBroker, fStrIdentity, ZMQ_SNDMORE); // Первый кадр - обратный адрес

    fDummy :=  s_recv(fSocketBroker); // Пустой кадр - разделитель конверта
    s_send(fSocketBroker, '', ZMQ_SNDMORE); // Пустой кадр - разделитель конверта

    fDummy :=  s_recv(fSocketBroker); // Ответ от рабочей нити, тоже игнорируем


    QueryPerformanceCounter(fStop);
    fDT := (MSecsPerSec * (fStop - fStart)) div fFrequency;

    if fDT < 5000 then
      s_send(fSocketBroker, 'Work harder') // Шлем задание
    else begin
      s_send(fSocketBroker, 'Fired!'); // Команда на остановку
      Inc(fWrkThreadsFired);
      if fWrkThreadsFired = C_NMBR_WORKERS then
        break;
    end;
  end;
  zmq_close(fSocketBroker);
  zmq_ctx_destroy(fContext);
  readln;
end.



На выходе сокета ROUTER формируется конверт со следующей структурой:
№ кадраДлинаСодержаниеОписание
11002947ws5fwИдентификатор отправителя
20
Пустой кадр - разделитель
37Hi BossСигнал готовности рабочего потока


Видно, что кадры ответного сообщения формируется и отправляются параллельно со считыванием кадров входящего сообщения.
Это сделано, чтобы лишний раз напомнить: сокет ROUTER - асинхронный.

ИМХО, сообщения лучше читать кадр за кадром, логически отделяя процесс приема от процесса передачи. Вот так, например:


  // Следующее сообщение возвращает поток, первый выполнивший задание
    fStrIdentity := s_recv(fSocketBroker); // Кадр с идентификатором (фактически - обратный адрес)
    fDummy :=  s_recv(fSocketBroker); // Пустой кадр - разделитель конверта
    fDummy :=  s_recv(fSocketBroker); // Ответ от рабочего потока, тоже игнорируем

    // Формируем конверт для ответа
    s_send(fSocketBroker, fStrIdentity, ZMQ_SNDMORE); // Первый кадр - обратный адрес
    s_send(fSocketBroker, '', ZMQ_SNDMORE); // Пустой кадр - разделитель конверта

    QueryPerformanceCounter(fStop);
    fDT := (MSecsPerSec * (fStop - fStart)) div fFrequency;

    if fDT < 5000 then
      s_send(fSocketBroker, 'Work harder') // Шлем задание
    else begin
      s_send(fSocketBroker, 'Fired!'); // Команда на остановку
      Inc(fWrkThreadsFired);
      if fWrkThreadsFired = C_NMBR_WORKERS then
        break;
    end;

После запуска приложение усиленно что-то делает 5 секунд, потом каждый поток отчитывается о количестве задач, которые успел выполнит за эти 5 секундВывод программы:


...вроде нагрузка распределилась примерно поровну.
...

Брокер на сокете ROUTER, рабочие процессы - на сокете DEALER.


Везде, где можно использовать сокет REQ, можно использовать DEALER. При этом следует учесть отличия:
  • Сокет REQ всегда отправляет пустой кадр - разделитель конверта перед любыми кадрами с данными. Сокет DEALER так не делает.
  • Сокет REQ всегда отправляет одно сообщение перед тем, как принимает ответ. Сокет DEALER - полностью асинхронный.

В нашем примере нет никакой разницы - синхронная работа или асинхронная, так как придерживаемся строгой последовательности "запрос - ответ". Это станет важно позднее, когда мы займемся вопросами восстановления после сбоев.

Рассмотрим такой же пример, как и предыдущий, но заменим сокет REQ на сокет DEALER:

program RouterToDealer;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  ZMQ_h,
  ZMQ_Utils,
  Windows;

const
  C_NMBR_WORKERS = 10;

procedure worker_task(args: Pointer); // ПРоцедура рабочей нити
var
  fContext: Pointer;
  fDummy: string;
  fSocketWorker: Pointer;
  fTotal: Integer;
  fWorkload: Utf8String;
begin
  fContext := zmq_ctx_new();
  fSocketWorker := zmq_socket(fContext, ZMQ_DEALER);
  // Устанавливает случайное текстовое значение идентификатора для сокета
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, 'tcp://localhost:5671');

  fTotal := 0;
  while true do
  begin
// Сообщаем брокеру, что нить готова к работе
    s_send(fSocketWorker, '', ZMQ_SNDMORE); // Отправляем пустой кадр-разделитель конверта
    s_send(fSocketWorker, 'Hi Boss'); // Отправляем сообщение о готовности

// Получаем рабочее задание от брокера, пока не будет команды на прекращение
    fDummy := s_recv(fSocketWorker); // Пропускаем кадр - разделитель
    fWorkload := s_recv(fSocketWorker); // Получаем задание
    if fWorkload = 'Fired!' then
    begin
      z_Log(Format('Completed: %d tasks', [fTotal]));
      break;
    end;

    sleep(random(500) + 1); // Выполнение какой-то "полезной" работы
    Inc(fTotal);
  end;
  zmq_close(fSocketWorker);
  zmq_ctx_destroy(fContext);
end;

var
  fContext: Pointer;
  fSocketBroker: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fFrequency: Int64;
  fStart: Int64;
  fStop: Int64;
  fDT: Int64;
  fWrkThreadsFired: Integer;
  fStrIdentity: string;
  fDummy: string;
begin
  fContext := zmq_ctx_new();
  fSocketBroker := zmq_socket(fContext, ZMQ_ROUTER);

  zmq_bind(fSocketBroker, 'tcp://*:5671');
  Randomize;

  // Запускаем десять рабочих нитей
  for i := 0 to Pred(C_NMBR_WORKERS) do
    BeginThread(nil, 0, @worker_task, nil, 0, fThrId);

 // Засекаем время
  QueryPerformanceFrequency(fFrequency);
  QueryPerformanceCounter(fStart);

// В течении пяти секунд шлем задания, а затем посылаем сообщение, чтобы остановились
  fWrkThreadsFired := 0;
  while true do
  begin

  // Следующее сообщение возвращает нить, первой выполнившей задание
    fStrIdentity := s_recv(fSocketBroker); // Кадр с идентификатором (фактически - обратный адрес)
    // Формируем конверт для ответа
    s_send(fSocketBroker, fStrIdentity, ZMQ_SNDMORE); // Первый кадр - обратный адрес

    fDummy :=  s_recv(fSocketBroker); // Пустой кадр - разделитель конверта
    fDummy :=  s_recv(fSocketBroker); // Ответ от рабочей нити, тоже игнорируем

    s_send(fSocketBroker, '', ZMQ_SNDMORE); // Пустой кадр - разделитель конверта

    QueryPerformanceCounter(fStop);
    fDT := (MSecsPerSec * (fStop - fStart)) div fFrequency;

    if fDT < 5000 then
      s_send(fSocketBroker, 'Work harder') // Шлем задание
    else begin
      s_send(fSocketBroker, 'Fired!'); // Команда на остановку
      Inc(fWrkThreadsFired);
      if fWrkThreadsFired = C_NMBR_WORKERS then
        break;
    end;
  end;
  zmq_close(fSocketBroker);
  zmq_ctx_destroy(fContext);
  readln;
end.


Этот код почти такой же, как предыдущий, за исключением того, что рабочие процессы используют сокет DEALER и читают и пишут пустой кадр перед кадром данных. Таким образом, сохранена совместимость с вариантом рабочих процессов на сокетах REQ.

Следует помнить, что пустой кадр - разделитель конверта необходим для того, чтобы позволить в последовательной цепочке запросов использовать завершающий сокет REQ, который использует данный разделитель для отсечения обратного адреса конверта так, чтобы можно было в приложении обрабатывать кадры данных.

Если сообщение никогда не будет передано к сокету REP, то для простоты мы можем с обоих сторон просто отбросить пустой кадр - разделитель. Что, в общем, часто и делается.
 ...

Брокер сообщений с балансировкой нагрузки

Предыдущий пример был завершен лишь наполовину. Брокер может управлять множеством рабочих процессов с фиктивными запросами и ответами, но не может общаться с клиентами. Если мы добавим еще один frontend ROUTER сокет, который принимает запросы клиентов, и превратим наш пример в настоящий прокси, который может передавать сообщения от frontend к backend, мы получим полезный и готовый для практического многократного повторного использования крошечный брокер сообщений с балансировкой нагрузки.

Этот брокер делает следующее:
  • принимает соединения от множества клиентов;
  • принимает соединения от множества рабочих процессов;
  • принимает запросы от клиентов и хранит их в общей очереди;
  • отсылает эти запросы рабочим процессам в соответствии со схемой «Балансировка нагрузки»;
  • обратно принимает ответы от рабочих процессов;

Исходник получается длинноват, но в нем стоит разобраться.



program LoadBalancingBroker;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
//=================================
// Для упрощения запуска пример реализована как многонитевое приложение,
// с использованием протокола inproc
//=================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности.
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает задание исходному клиенту



uses
  SysUtils, ZMQ_h, ZMQ_Utils, Windows;
const
  c_NBR_CLIENTS = 5; // Число клиентов
  c_NBR_WORKERS = 3; // // Число рабочих
  c_NMBR_REQ = 2; // // Число запросов от каждого клиента

  // Конечные точки подключения
  c_url_clients = 'inproc://clients';
  c_url_workers = 'inproc://workers';

procedure client_thread_proc(aContext: Pointer);
// Процедура нити клиента
var
  fSocketClient: Pointer;
  fReply: string;
  i: integer;
begin
  fSocketClient := zmq_socket(aContext, ZMQ_REQ);
  // Назначение идентификатора соединения (строка случайных символов)
  s_set_id(fSocketClient);
  zmq_connect(fSocketClient, c_url_clients); // Коннект к брокеру

  for i := 0 to Pred(c_NMBR_REQ) do begin
  // Отправка запроса, получение ответа
    s_send(fSocketClient, 'HELLO');
    fReply := s_recv(fSocketClient);
    z_Log(Format('Client : %s', [fReply]));
  end;
  zmq_close(fSocketClient);
end;


procedure worker_thread_proc(aContext: Pointer);
// Процедура рабочей нити
var
  fSocketWorker: Pointer;
  fIdentity: string;
  fEmpty: string;
  fRequest: string;
begin
  fSocketWorker := zmq_socket(aContext, ZMQ_REQ);
   // Назначение идентификатора соединения
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, c_url_workers); // Коннект к брокеру

  // Соощаем брокеру, что рабочая нить запущена и готова к работе
  s_send(fSocketWorker, 'READY');

  while true do // Рабочий цикл
  begin
    // Читаем и запоминаем все кадры вплоть пустого (fEmpty)
    // В данном примере кадров всего один, но реально их может быть больше
    fIdentity := s_recv(fSocketWorker); // Идентификатор клиента
    if zmq_errno() = ETERM then
      Break; // Уходим, если контекст в процессе завершения

    fEmpty := s_recv(fSocketWorker); // Кадр - разделитель
    Assert(fEmpty = '');
    // Получение запроса, отправка ответа
    fRequest := s_recv(fSocketWorker);
    z_Log(Format('Worker : %s', [fRequest]));

    Sleep(50); // Имитируем выполнение полезной работы

    // Формирование конверта составного сообщения:
    s_send(fSocketWorker, fIdentity, ZMQ_SNDMORE);
      // Идентификатор клиента
    s_send(fSocketWorker, '', ZMQ_SNDMORE); // Разделитель
    s_send(fSocketWorker, 'OK'); // Результат работы
  end;
  zmq_close(fSocketWorker);
end;


var
  fContext: Pointer;
  fSocketClients: Pointer;
  fSocketWorkers: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fAvailableWorkers: Integer;
  fZMQPoll: array[0..1] of zmq_pollitem_t;
  fWrkrs_Que: array[0..Pred(c_NBR_WORKERS)] of string; // Очередь рабочих
  fRC: Integer;
  fWorker_id: string;
  fClient_id: string;
  fEmpty: string;
  fReplay: string;
  fRequest: string;
  fCliReqNmbr: Integer; // Номер клиентского запроса
  fThrWIds: array[0..Pred(c_NBR_WORKERS)] of Cardinal;
  fThrCIds: array[0..Pred(c_NBR_CLIENTS)] of Cardinal;
begin
 // Подготовка контекста и сокетов
  fContext := zmq_ctx_new();
  fSocketClients := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketWorkers := zmq_socket(fContext, ZMQ_ROUTER);
  zmq_bind(fSocketClients, c_url_clients);
  zmq_bind(fSocketWorkers, c_url_workers);

  for i := 0 to Pred(c_NBR_WORKERS) do // Создаем рабочих
    fThrWIds[i] := BeginThread(nil, 0, @worker_thread_proc, fContext, 0, fThrId);

  for i := 0 to Pred(c_NBR_CLIENTS) do // Создаем клиентов
    fThrCIds[i] := BeginThread(nil, 0, @client_thread_proc, fContext, 0, fThrId);


// Главный цикл для LRU очереди. Используется два сокета: fSocketClients для
// клиентов и fSocketWorkers для рабочих. Опрос fSocketWorkers
// выполняется всегда, а fSocketClients - только тогда, когда есть один или
// больше готовых рабочих.
// Сообщения, которые еще не готовы к обработке, в ZMQ хранятся
// во встроенной очередей сообщений.
// Когда мы получаем запрос клиента, мы берем рабочего из начала
// очереди (fWrkrs_Que[0]) и посылаем ему запрос, которых включаеи исходный
// идентификатор клиента.
// Когда же приходит запрос от рабочего, этого рабочего ставим в конец очереди,
// а ответ переправляем исходному клиенту (используя идинтификатор в конверте).

  fAvailableWorkers := 0; // Число доступных рабочих

  fCliReqNmbr := 0;

  // Подготовка пула сокетов
  fZMQPoll[0].socket := fSocketWorkers;
  fZMQPoll[0].fd := 0;
  fZMQPoll[0].events := ZMQ_POLLIN;
  fZMQPoll[1].socket := fSocketClients;
  fZMQPoll[1].fd := 0;
  fZMQPoll[1].events := ZMQ_POLLIN;

  while fCliReqNmbr < c_NBR_CLIENTS * c_NMBR_REQ do begin

    fZMQPoll[0].revents := 0; // Сброс результатов опроса
    fZMQPoll[1].revents := 0;


    // Читать из fSocketClients только тогда, когда есть свободные рабочие
    // Если нет свободных - читать только из fSocketWorkers
    if fAvailableWorkers = 0 then
      fRC := zmq_poll(@fZMQPoll[0], 1, 11)
        // Только ждем готовности от рабочих
    else
      fRC := zmq_poll(@fZMQPoll[0], 2, 11); // Читаем оба сокета

    if fRC = -1 then
      Break; // Цикл прерван

    // Обрабока действий рабочих на fSocketWorkers
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then begin
      fWorker_id := s_recv(fSocketWorkers);
      Assert(fAvailableWorkers < c_NBR_WORKERS);
      // Помещаем рабочего в конец очереди
      fWrkrs_Que[fAvailableWorkers] := fWorker_id;
      Inc(fAvailableWorkers);

      // Второй кадр - пустой
      fEmpty := s_recv(fSocketWorkers);
      assert(fEmpty = '');

      // Третий кадр - готовность ("READY"), иначе это Id клиента в ответе
      fClient_id := s_recv(fSocketWorkers);

      // Если это ответ клиенту, отправить ответ в fSocketClients
      if fClient_id <> 'READY' then begin
        fEmpty := s_recv(fSocketWorkers);
        Assert(fEmpty = '');
        fReplay := s_recv(fSocketWorkers);
        s_send(fSocketClients, fClient_id, ZMQ_SNDMORE);
        s_send(fSocketClients, '', ZMQ_SNDMORE);
        s_send(fSocketClients, fReplay);
        Inc(fCliReqNmbr); // Число обслуженных запросов

      end;
    end;

    // Обработка запросов клиентов:
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
    begin
      // Получаем очередной клиентский запрос, отправляем его рабочему из
      // начала очереди
      // Конверт запроса клиента: [identity][empty][request]
      fClient_id := s_recv(fSocketClients);
      fEmpty := s_recv(fSocketClients);
      Assert(fEmpty = '');
      fRequest := s_recv(fSocketClients);

      s_send(fSocketWorkers, fWrkrs_Que[0], ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fClient_id, ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fRequest);

      // Извлечение из очереди
      Dec(fAvailableWorkers);
      for i := 0 to Pred(fAvailableWorkers) do
        fWrkrs_Que[i] := fWrkrs_Que[i + 1];
    end;
  end;

  for I := 0 to High(fThrCIds) do // Ждем завершения клиентов
    WaitForSingleObject(fThrCIds[i], INFINITE);

  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);

//  for I := 0 to High(fThrWIds) do
//    WaitForSingleObject(fThrWIds[i], INFINITE);

  Readln;

end.


Исходник избыточно комментирован, поэтому добавлю только пару замечаний.

1. Обращаем внимание на цикл рабочей нити worker_thread_proc():

  while true do // Рабочий цикл
  begin
    // Читаем и запоминаем все кадры вплоть пустого (fEmpty)
    // В данном примере кадров всего один, но реально их может быть больше
    fIdentity := s_recv(fSocketWorker); // Идентификатор клиента
    if zmq_errno() = ETERM then
      Break; // Уходим, если контекст в процессе завершения

Так как для связи между нитями используется протокол inproc, необходимо использовать общий контекст, который передается как параметр в процедуру нити.
Так вот, когда основная нить завершается, выполняется закрытие контекста ZMQ:

  zmq_ctx_destroy(fContext);

Следовательно, обращение к сокетам контекста вызовет ошибку ETERM, что в данном случае считается признаком необходимости завершения основного цикла процедуры нити.

2. Основным источником задач в примере является процедура нити клиентов client_thread_proc(). Следовательно, при завершении приложения есть смысл дождаться завершения нитей клиентского слоя. Для этого при создании нитей слоя клиентов запоминаются дескрипторы нитей, а при завершении процедуры выполняется ожидание завершения всех нитей клиентов:

var
...
  fThrCIds: array[0..Pred(c_NBR_CLIENTS)] of Cardinal;
...
begin
...
  for i := 0 to Pred(c_NBR_CLIENTS) do // Создаем клиентов
    fThrCIds[i] := BeginThread(nil, 0, @client_thread_proc, fContext, 0, fThrId);
...
...
...
  for I := 0 to High(fThrCIds) do // Ждем завершения клиентов
    WaitForSingleObject(fThrCIds[i], INFINITE);

  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);

Еще немного поясню.

В алгоритме две сложные вещи: 

  1. упаковка сообщений в конверты при каждом чтении и записи;
  2. сам алгоритм балансировки нагрузки


Рассмотрим весь путь сообщения схемы "Запрос - от клиента до рабочего процесса и обратно. В коде есть вызовы
:
  
s_set_id(fSocketClient);
и
s_set_id(fSocketWorker);

- это установка идентичности сокетов в форме набора случайных символов в строках. Такая форма используется исключительно для облегчения процесса отслеживания сообщений. В реальности, можно не задавать идентификацию самому, а позволить это сделать сокету ROUTER.

Предположим, что идентификатором клиента будет строка "CLIENT", а рабочего - "WORKER". Предположим, что клиентское приложение посылает один кадр, содержащий "Hello".


Сообщение, отсылаемое клиентом:
№ кадраДлинаСодержаниеОписание
15HelloКадр с данными


Сокет REQ сам добавляет в начало пустой кадр - разделитель. Далее, сокет ROUTER добавляет идентификатор соединения. В итоге прокси читает уже адрес (идентификатор) клиента, пустой кадр и данные:

Сообщение, которое читает прокси с фронтэнд сокета ROUTER:
№ кадраДлинаСодержаниеОписание
15ClientИдентификатор клиента
20
Пустой кадр - разделитель
35HelloКадр с данными

Брокер отправляет все это рабочему, предварив двумя кадрами: идентификаторо рабочего и разделителем.

Сообщение, которое брокер отправляет в бэкэнд сокета ROUTER:
№ кадраДлинаСодержаниеОписание
16WorkerИдентификатор рабочего
20
Пустой кадр - разделитель
35ClientИдентификатор клиента
40
Пустой кадр - разделитель
55HelloКадр с данными


Этот конверт распаковывается сначала бэкенд - сокетом ROUTER, который удаляет первый кадр и отправляет все сокету REQ рабочего. Потом сокет REQ рабочего удаляет пустой кадр, а остаток передает в приложение рабочей нити:


Сообщение, которое получает рабочий:
№ кадраДлинаСодержаниеОписание
15ClientИдентификатор клиента
20
Пустой кадр - разделитель
35HelloКадр с данными


Рабочий сохраняет все части сообщения, что-то делает с данными и возвращает результат. 
На обратном пути сообщение проходит те же самые этапы, то есть, например, на бэкэнд сокет брокеру передается сообщение из пяти частей, а брокер отправляет в фронтенд сокет сообщение из трех частей, в итоге клиент получает сообщение из одной части.

~~~~~~~~~~~~~~~~~

Теперь по поводу алгоритма балансировки нагрузки.

Со стороны и клиента, и рабочего используются сокеты REQ. Рабочий должен корректно получить, запомнить и правильно вернуть получаемые конверты . Алгоритм:

  • создается пул сокетов. Пул всегда содержит бэкэнд сокет (сторона рабочего), и, если есть хоть один свободный рабочий - фронтэнд сокет (сторона клиента).
  • пул активируется с бесконечным таймаутом;
  • если есть активность со стороны бэкэнда (рабочего), мы читаем либо сообщение "READY", либо идентификатор клиента, для которого рабочий отправил сообщение. В обоих случаях мы помещаем адрес рабочего (первый кадр конверта) в конец очереди свободных рабочих, а оставшуюся часть (если это ответ клиенту) отправляем через фронтенд сокет клиенту.
  •  если есть активность со стороны фронтэнда (клиент), мы берем запрос клиента, извлекаем из очереди адрес очередного рабочего (который использовался последним), и отправляем запрос в бэкэнд (рабочему). В конверте теперь первый кадр - адрес рабочего, потом разделитель, потом все три части клиентского запроса.

Данная схема легко расширяется. Например, в процессе работы работники могли бы запускать счетчики производительности для оценки собственного быстродействия и отчитываться брокеру о результатах. А брокер мог бы выбирать самого быстрого из работников, использованных последними.
...
Примеры становятся все длиннее и сложнее. Есть смысл перейти на более высокий уровень абстракции: начать использовать библиотеку CZMQ: (Продолжение).

Комментариев нет :

Отправить комментарий