среда, 3 декабря 2014 г.

21.4 ZeroMQ: рабочий пример. Межброкерная маршрутизация. Вот что получилось.

(Начало - здесь).




Соберем все вместе.


Как и раньше, отдельный кластер будет представлен одним процессом.

Кода получилось довольно много, так как здесь объединяется логика обоих предыдущих примеров. Он достаточно хорошо имитирует кластеры, включающие клиентов и рабочих. 

Код:

program Peering3;

{$APPTYPE CONSOLE}

uses
  //  FastMM4,
  SysUtils
  , Classes
  , Windows
  , zmq_h
  , czmq_h
  , zmq_utils
  , math
  ;

// Симуляция брокера партнеров (часть 3)
// Прототипирование потоков данных состостояния и задач

// Запуск:peering свой_порт список_портов_партнеров
//
// Для облачной связи используются порты:
//
// cloudfe (входные запросы) = свой_порт
// cloudbe (запросы к партнерам) = чужие_порты
//
// statefe (подписка на состояние партнеров) = 1чужие порты
// statebe (публикация своего состояния для партнеров) = 1свой_порт
//


// Для локальной связи используется протокол inproc:
// localfe (входные запросы от клиентов) = localfe_inproc
// localbe (запросы к рабочим) = localbe_inproc
// monitor (служебные сообщения) = monitor_inproc

const
  c_NBR_CLIENTS = 10;
  c_NBR_WORKERS = 5;
  c_WORKER_READY: byte = 1; // Сигнал "рабочий готов"

  // Свое собственное имя (порт ввода-вывода).
  // В реальности должно быть задано при конфиргурировнии узла

var
  self: PChar = nil;

function zFormat(const aFormat: string;
  const aArgs: array of const): string;
var
  fFS: TFormatSettings;
begin
  GetLocaleFormatSettings(GetUserDefaultLCID(), fFS);
  Result := Format(aFormat, aArgs, fFS)
end;


function client_task(args: Pointer): Pointer; cdecl;
// Клиентская задача. Несколько секунд спит, затем порождает пачку запросов.
// Это имитирует спорадическую активность, когда одновременно активно множество
// клиентов и локальные рабочие должны оказаться перегружены.
// Для запросов используется сокет REQ.
// Для статистики данные отправляются в сокет мониторинга.

var
  client: Pointer;
  monitor: Pointer;
  burst: Integer;
  task_id: pChar;
  pollset: zmq_pollitem_t;
  rc: Integer;
  reply: PChar;
begin
  client := zsocket_new(args, ZMQ_REQ);
  rc := zsocket_connect(client, 'inproc://localfe_inproc');
  monitor := zsocket_new(args, ZMQ_PUSH);
  rc := zsocket_connect(monitor, 'inproc://monitor_inproc');

  while True do begin
    sleep(Random(5) * 1000);

    for burst := 0 to Random(10) do begin
      task_id := PChar('Task ' + IntToHex(Random($10000), 4) + 'h from ' + self);
      // Запрос со случайным значением id в 16сс
      rc := zstr_send(client, task_id);

      // Ответа ждать максимум 10 секунд, потом сообщить о проблеме
      zPollItemInit(pollset, client, 0, ZMQ_POLLIN, 0);
      rc := zmq_poll(@pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
      if rc = -1 then
        break; // Прерван

      if (pollset.revents and ZMQ_POLLIN) <> 0 then begin
        reply := zstr_recv(client);
        if reply = nil then
          break; // Прерван
        //        assert(string(reply) = string(task_id)); // !!!! Это - сравнение строк
        zstr_send(monitor, reply);
      end
      else begin
        zstr_send(monitor,
          PChar('E: CLIENT EXIT - lost task ' + task_id));
        result := nil;
        Exit;
      end
    end
  end;
  Result := nil;
end;



function worker_task(args: Pointer): Pointer; cdecl;
// Задача рабочего. Для подключения к балансировщику нагрузки использует сокет
// REQ. То же самое, что и в предудущих peering* примерах

var
  frame: p_zframe_t;
  msg: p_zmsg_t;
  new_msg: string;
  rc: Integer;
  worker: Pointer;
  str_msg: PChar;
begin
  worker := zsocket_new(args, ZMQ_REQ);
  rc := zsocket_connect(worker, 'inproc://localbe_inproc');

  // Сообщаем брокеру о готовноти к работе
  frame := zframe_new(@c_WORKER_READY, 1);
  rc := zframe_send(frame, worker, 0);

  // Process messages as they arrive
  while true do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Прерван
    // Рабочие как бы что-то делают 0 или 1 секунду
    Sleep(Random(2) * 1000);

    frame := zmsg_last(msg);
    str_msg := zframe_strdup(frame);

    new_msg := str_msg + ' processed by ' + self;

    zframe_reset(frame, PChar(new_msg), Length(new_msg));
    zstr_free(str_msg);


    zmsg_send(msg, worker);
  end;
  result := nil;
end;



procedure doMain;
// Основная задача начинается с настройки сокетов.
// Локальные фронтэнд и бэкэнд общаются с клиентами и рабочими соответственно.
// Облачные фронтэнд и бэкэнд общаются с брокерами - партнерами соответственно
// так же, как если те они были клиентами и рабочими.
// В бэкэнд состояние регулярно публикуются сообщения о собственном состоянии.
// Фронтэнд состояния подписывается на сообщения о состоянии всех бэкэндов
// для сбора сообщений о состоянии.
// Для сбора различных "печатных" сообщений  от задач используется PULL - сокет
// мониторинга.


var
  cloudbe: Pointer;
  cloudfe: Pointer;
  cloud_capacity: Integer;
  ctx: p_zctx_t;
  data: PChar;
  frame: p_zframe_t;
  i: Integer;
  identity: p_zframe_t;
  localbe: Pointer;
  localfe: Pointer;
  local_capacity: Integer;
  monitor: Pointer;
  msg: p_zmsg_t;
  peer: PChar;
  previous: Integer;
  primary: array[0..3] of zmq_pollitem_t;
  random_peer: Integer;
  secondary: array[0..1] of zmq_pollitem_t;
  rc: Integer;
  size: Size_t;
  statebe: Pointer;
  statefe: Pointer;
  status: PChar;
  workers: P_zlist_t;
begin
  if (ParamCount < 2) then begin
    z_Log('syntax: peering3 me {you}...');
    Exit;
  end;
  // Первый аргумент командной строки - адрес брокера
  // Остальные аргументы - адреса партнеров


  self := PChar(ParamStr(1));
  z_Log('I: preparing broker at ' + self);
  Randomize;

  // Подготовка локальных фронтэнда и бэкэнда
  ctx := zctx_new();
  localfe := zsocket_new(ctx, ZMQ_ROUTER);
  rc := zsocket_bind(localfe, 'inproc://localfe_inproc');

  localbe := zsocket_new(ctx, ZMQ_ROUTER);
  rc := zsocket_bind(localbe, 'inproc://localbe_inproc');

  // Привязка облачного фронтэнда к конечной конкретной точке
  cloudfe := zsocket_new(ctx, ZMQ_ROUTER);
  zsocket_set_identity(cloudfe, self);
  zsocket_bind(cloudfe, 'tcp://*:%s', self);

  // Коннект облачного бэкэнда ко всем портнерам
  cloudbe := zsocket_new(ctx, ZMQ_ROUTER);
  zsocket_set_identity(cloudbe, self);
  for i := 2 to ParamCount do begin
    z_Log('I: connecting to cloud frontend at ' + ParamStr(i));
    rc := zsocket_connect(cloudbe, 'tcp://127.0.0.1:%s', PChar(ParamStr(i)));
  end;
  // Привязка бэкэнда состояния к конкретной конечной точке
  statebe := zsocket_new(ctx, ZMQ_PUB);
  zsocket_bind(statebe, 'tcp://*:1%s', self);

  // Фронтэнд состояния подключается ко всем партнерам
  statefe := zsocket_new(ctx, ZMQ_SUB);
  zsocket_set_subscribe(statefe, '');
  for i := 2 to ParamCount do begin
    z_Log('I: connecting to state backend at ' + ParamStr(i));
    rc := zsocket_connect(statefe, 'tcp://127.0.0.1:1%s', PChar(ParamStr(i)));
  end;
  // Подготовка сокета мониторинга
  monitor := zsocket_new(ctx, ZMQ_PULL);
  rc := zsocket_bind(monitor, 'inproc://monitor_inproc');

  // После привязки и коннекта всех наших сокетов стартуем дочерние
  // задачи: рабочих и клиентов

  for i := 0 to Pred(c_NBR_WORKERS) do
    zthread_new(@worker_task, ctx);

  // Запуск локальных клиентов
  for i := 0 to Pred(c_NBR_CLIENTS) do
    zthread_new(@client_task, ctx);

  // Очередь незанятых рабочих
  local_capacity := 0;
  cloud_capacity := 0;
  workers := zlist_new();

  // Главный цикл состоит из двух частей.
  //
  // 1. В любом случае опрашиваем локальных и облачных рабочих и два сервисных
  // сокета (statefe и monitor). Если свободных рабочих нет, то нет смысла
  // даже смотреть на входящие запросы (они останутся во внутренних очередях ZMQ)

  while (true) do begin
    zPollItemInit(primary[0], localbe, 0, ZMQ_POLLIN, 0);
    zPollItemInit(primary[1], cloudbe, 0, ZMQ_POLLIN, 0);
    zPollItemInit(primary[2], statefe, 0, ZMQ_POLLIN, 0);
    zPollItemInit(primary[3], monitor, 0, ZMQ_POLLIN, 0);

    // Если свободных рабочих нет, ждем бесконечно (пока не появятся)
    // Если есть - ждем до 1 секунды
    rc := zmq_poll(@primary[0], 4,
      IfThen(local_capacity > 0, 1000 * ZMQ_POLL_MSEC, -1));
    if rc = -1 then
      Break; // Прерван

    // Отлов изменения количества свободных рабочих в течении этой итерации
    previous := local_capacity;
    msg := nil;

    if (primary[0].revents and ZMQ_POLLIN) <> 0 then begin
      // Ответ от локального рабочего ******************************************
      msg := zmsg_recv(localbe);
      if msg = nil then
        break; // Прерван
      identity := zmsg_unwrap(msg); // Кадр с идентификатором рабочего выдергиваем
      zlist_append(workers, identity); // ... и запоминаем в очереди
      Inc(local_capacity);

      // Если это - сигнал "Готов", сообщение вообще не обрабатывается
      frame := zmsg_first(msg);
      if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
        zmsg_destroy(msg);
    end
    else if (primary[1].revents and ZMQ_POLLIN) <> 0 then begin
      // ...либо обработать ответ от брокера - партнера ************************
      msg := zmsg_recv(cloudbe);
      if msg = nil then
        break; // Прерван
      // Идентификатор брокера не используется, отбрасываем
      identity := zmsg_unwrap(msg);
      zframe_destroy(identity);
    end;
    // Перенаправление ответа в облако, если он адресован брокеру
    if msg <> nil then
      for i := 2 to ParamCount do begin
        data := PChar(zframe_data(zmsg_first(msg)));
        size := zframe_size(zmsg_first(msg));
        if (size = Length(ParamStr(i))) and (CompareMem(data, PChar(ParamStr(i)), size)) then begin
          zmsg_send(msg, cloudfe);
          Break
        end;
      end;
    // Если сообщение все еще не отправлено, перенаправить его клиенту
    if msg <> nil then
      zmsg_send(msg, localfe);

    // Если в сокетах statefe или в monitor есть входящие сообщения,
    // обрабатываем их немедленно

    if (primary[2].revents and ZMQ_POLLIN) <> 0 then begin
      // Сообщение о статусе партнера ******************************************
      peer := zstr_recv(statefe);
      status := zstr_recv(statefe);
      cloud_capacity := StrToInt(status);
      zstr_free(peer);
      zstr_free(status);
    end;
    if (primary[3].revents and ZMQ_POLLIN) <> 0 then begin
      // Вывод сообщения мониторинга *******************************************
      status := zstr_recv(monitor);
      z_Log(status);
      zstr_free(status);
    end;

    // 2. Теперь распределяем рабочим столько клиентских запросов, сколько
    // мы можем обработать.
    // Если есть локальные мощности, то опрашиваем и localfe и cloudfe.
    // Если есть только облачные мощности, опрашиваем только localfe.
    // Если есть возможность, распределяем задачи локально, иначе распределяем
    // задачи в облако.

    while (local_capacity + cloud_capacity) > 0 do begin
      zPollItemInit(secondary[0], localfe, 0, ZMQ_POLLIN, 0);
      zPollItemInit(secondary[1], cloudfe, 0, ZMQ_POLLIN, 0);

      rc := zmq_poll(@secondary[0], IfThen(local_capacity > 0, 2, 1), 0);
      assert(rc >= 0);

      if (secondary[0].revents and ZMQ_POLLIN) <> 0 then
        msg := zmsg_recv(localfe)
      else if (secondary[1].revents and ZMQ_POLLIN) <> 0 then
        msg := zmsg_recv(cloudfe)
      else
        break; // Работы нет, возвращаемся снова к primary

      if local_capacity > 0 then begin
        frame := zlist_pop(workers); // Добавляем кадр с идентификатором
        zmsg_wrap(msg, frame); // свободного рабочего как адрес получателя
        zmsg_send(msg, localbe);
        Dec(local_capacity); // Рабочий занят
      end
      else begin
        // Отправляем случайному брокеру - партнеру
        // PS: в общем случае, этот партнер может и не иметь свободных
        // рабочих, ибо cloud_capacity мог быть получен от кого угодно по
        // принципу round-robin
        random_peer := Random(ParamCount - 1) + 2;
        zmsg_pushmem(msg, PChar(ParamStr(random_peer)), Length(ParamStr(random_peer)));
        zmsg_send(msg, cloudbe);
      end;
    end; // end of while

    if (local_capacity <> previous) then begin
      // Публикуем сообщения для партнеров о собственной рабочей мощности
      // Для уменьшения "дребезга" делаем это to только при изменении

      // Формируем собственный идентификатор в составе конверта
      zstr_sendm(statebe, self);
      // Публикация нового значения рабочей мощности
      zstr_send(statebe, PChar(IntToStr(local_capacity)));
    end;
  end;
  // Подчистка хвостов при выходе
  while zlist_size(workers) <> 0 do begin
    frame := zlist_pop(workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;
begin
  IsMultiThread := True;
  zsys_handler_set(nil);
  doMain;
  Readln;
end.


Эта программа уже не так проста, ее сложность приблизительно оценивается в один человеко-день.

Замечания по коду.


  • Клиентские нити обнаруживают и сообщают о невыполненных запросах. Это делается уже знакомым методом поллинга и, в случае отсутствия ответа в течении 10 секунд, печатается сообщение об ошибке.
  • Клиентские нити сами ничего не печатают, вместо этого они направляют сообщение сокету мониторинга (PUSH), который в цикле собирает их (PULL) и печатает. Это первый случай использования сокетов ZeroMQ для мониторинга и логирования. К этому способу мы еще вернемся.
  • Клиенты имитируют различную нагрузку, чтобы получать доступ к кластеру в 100% случайные моменты времени, чтобы задачи могли попадать в облако. Это управляется количеством клиентов и рабочих а также временем задержки в клиентских и рабочих нитях. Для более точной имитации реальной ити значения можно менять и смотреть, что получится.
  • Основной цикл использует два поллинг - набора. В действительности нужно использовать три: информация о состоянии, бэкэнд (рабочие) и фронтэнд (клиенты). Как и в предыдущих прототипах, сообщения из фронтэнда на сычитываются, если нет свободный рабочих в бэкэнде.
  • Для облачных (между "кластерами") потоков данных в качестве транспорта используется все тот же протокол tcp. Для  локальных потоков данных в этот раз я решил использовать протокол inproc.

     
В процессе разработки этого примера возникли следующие трудности:








  • Из-за потерь сообщений (запросов или ответов) клиенты зависают. Напомню, что сокет ROUTER отбрасывет сообщения, которые не могут быть маршрутизированы. Для решения проблемы был модифицирована клиентская нить, чтобы она обнаруживала такую ситуацию. В процессе поиска проблемы использовался диагностический метод zmsg_dump(), который был размещен после каждого приема и перед каждой отправкой в главном цикле, пока проблема не разрешилась.
  • В основном цикле ошибочно читался более чем один готовый сокет. Из-за этого первое сообщение терялось. Алгоритмс был изменен так, что чтение выполнялось только из первого готового сокета.
  • Класс zmsg неправильно декодирует UUID-ы как строки C. Это происходит из-за того, что UUID-ы содержат нулевый байты. Пофиксено методом замены  UUID-ов на печатные шестнадцатеричные строки.

Эта имитация не обнаружитвает исчезновение облачного партнера. Если вы запустили несколько партнеров а затем остановили один, все остальные будут продолжать отправлять ему задания, как будто он не уходил. Можно попробовать так сделать - сразу увидим кучу сообщений от клиентов насчет потерянных запросов.
Есть два решения проблемы.
Первое: хранить информацию о количестве партеров в течении короткого времени так, что если партнер исчезает, его мощность быстро становится равной нулю.
Второе: добавить надежности в цепочку запрос - ответ.

Далее мы рассмотрим способы повышения надежности. (Продолжение).




Комментариев нет :

Отправить комментарий