среда, 3 декабря 2014 г.

21.2 ZeroMQ: рабочий пример. Межброкерная маршрутизация. Прототипирование потока данных о состоянии.


(Начало - здесь)

Прототипирование потока данных о состоянии


Так как в каждом потоке данных сокетов есть свои маленькие ловушки для раззяв, будем добавлять новые потоки данных в реальный код постепенно, шаг за шагом. Начнем с потока состояния.

Код прототипа обработчика потока состояния:



program Peering1;

{$APPTYPE CONSOLE}

uses
  SysUtils, zmq_h, czmq_h, zmq_Utils;

// Имитация межброкерного взаимодействия (часть 1)
// Прототип потока состояний

procedure doMain;
var
  argn: Integer;
  available: PChar;
  ctx: p_zctx_t;
  items: zmq_pollitem_t;
  peer: string;
  peer_name: PChar;
  rc: Integer;
  self: string;
  statebe: Pointer;
  statefe: Pointer;
begin
// Аргументы командной строки.
// Первый аргумент - имя брокера
// Прочие аргументы - имена  брокеров - партнеров
//
  if (ParamCount < 2) then begin
    z_Log('syntax: peering1 me {you}...');
    exit;
  end;

  self := ParamStr(1);
  z_Log(Format('I: preparing broker at %s...', [self]));
  Randomize;

  ctx := zctx_new();

  // Биндинг бэкэнд-сокета состояния к конечной точке
  statebe := zsocket_new(ctx, ZMQ_PUB);
  rc := zsocket_bind(statebe, 'tcp://127.0.0.1:%s', self);
  Assert(rc >= 0);

// Подключение statefe ко всем партнерам
  statefe := zsocket_new(ctx, ZMQ_SUB);
  zsocket_set_subscribe(statefe, '');
  for argn := 2 to ParamCount do begin
    peer := ParamStr(argn);
    z_Log('I: connecting to state backend at ' + peer);
    rc := zsocket_connect(statefe, 'tcp://localhost:%s', peer);
    Assert(rc >= 0);
  end;
// В главной цикле партнерам рассылаются сообщения о состоянии и собираются
// сообщения о состоянии от партнеров. Таймаут zmq_poll определяет свой
// собственный хартбит:

  while (true) do begin
    // Опрос активности, или 1 сек. таймаута
    items.socket := statefe;
    items.fd := 0;
    items.events := ZMQ_POLLIN;
    items.revents := 0;
    rc := zmq_poll(@items, 1, 1000 * ZMQ_POLL_MSEC);
    if rc = -1 then
      break; // Прерывание

// Обработка входящих сообщений о состоянии
    if (items.revents and ZMQ_POLLIN) <> 0 then begin
      peer_name := zstr_recv(statefe);
      available := zstr_recv(statefe);
      z_Log(Format('%s - %s workers free', [peer_name, available]));
      zstr_free(peer_name);
      zstr_free(available);
    end
    else begin
      //  Отправка (публикация) случайных значений о стостоянии
      zstr_sendm(statebe, PChar(self));
      zstr_send(statebe, PChar(IntToStr(Random(10))));
    end
  end;
  zctx_destroy(ctx);
end;

begin 
//При создании многонитевого Delphi приложения обязательно задавать  
// значение True для переменной System.IsMultyThread. 
  IsMultyThread := True;
 zsys_handler_set(nil); // Ctrl+C doMain; readln; end.

Код рабочий, запускать нужно из командной строки с параметрами:
Peering1.exe первый_параметр второй_и_последующие_параметры

Первый параметр - свой порт, второй и последующие - порты партнеров.
Например:
Peering1.exe 5555 5556
Peering1.exe 5556 5555





Пояснения к коду:

  • Каждый брокер идентифицируется уникальным номером порта, который используется при конструировании адреса конечной точки. Реальный брокер, размещенный на отдельном компьютере, будет идентифицироваться по более сложной схеме. Эти схемы мы рассмотрим позднее, а сейчас просто используем домен localhost (или ip 127.0.0.1).
  • Мы использовали zmq_poll() в качестве ядра программы. Он обрабатывал входящие сообщения и отправлял сообщения о состоянии. Мы отправляем сообщение о состоянии только тогда, когда у нас нет никаких входящих сообщений и мы ожидали 1 секунду. Если мы будем рассылать сообщения каждый раз, получатся бури из сообщений.
  • Мы используем составное (из двух частей) сообщение, содержащее адрес и данные. Нужно знать адрес издателя (того, кто публикует сообщения), чтобы правильно отправлять его задания, и единственным способом сделать это представляется включение адреса в часть сообщения.
  • Мы не назначаем идентификаторов для подписчиков, так как если бы мы это делали, то получали бы устаревшую информацию при подключении к работающим брокерам.
  • Для издателя мы не устанавливаем значение HWM, но это было бы хорошей идеей при использовании ZeroMQ v2.x.
Еще раз запустим нашу программу - например, в трех экземплярах:
 
peering1 5555 5556 5557 #  Старт на порту 5555, коннект к 5556 и 5557
peering1 5556 5555 5557 #  Старт на порту 5556, коннект к 5555 и 5557
peering1 5557 5555 5556 #  Старт на порту 5557, коннект к 5555 и 5556
 
Видим, как каждый кластер сообщает о своем своем запуске и коннекте к партнерам, а через пару секунд они все начинают бодро (примерно раз в секунду) выводить случайные числа "о состоянии" своих партнеров.

В реальной жизни мы сообщения о состоянии не через регулярные интервалы времени, а только тогда, когда произошло изменение этого состояния, например, когда рабочие становятся доступными или недоступными. Может показаться, что это займет много трафика, но сообщения о состоянии маленькие, и мы прияли как факт, что межкластерные соединения очень быстрые.

Если бы мы хотели отправлять сообщения о состоянии через точные интервалы, мы бы создали дочернюю нить с сокетом statebe. Из внешней главной нити мы бы посылали нерегулярные сообщения в эту нить об изменении состояния и разрешили бы дочерней нити объединять эти сообщения в регулярные исходящие сообщения. Для этого пришлось бы проделать больше работы, чем нужно.




Дальше мы выполним прототипирование локального и облачного потоков данных.
(Продолжение).

Комментариев нет :

Отправить комментарий