понедельник, 3 ноября 2014 г.

14. ZeroMQ: Схема "Издатель - Подписчик", подробности, структура конверта.



Шаблон "Запрос - Ответ" - подробности.

Ранее несколько раз были немного упомянуты составные сообщения.
Использование составных сообщений дает возможность оформлять сообщения в форме "конвертов", когда адрес отделен от тела сообщения. Сообщения в SUB/PUB сокетах как раз пересылаются в таких конвертах. Наличие адресов позволяет легко организовать двусторонний обмен с помощью средств общего назначения - таких, как API ZMQ и прокси, которые на лету создают, читают и удаляют адреса, не затрагивая "полезные" данные.

В шаблоне "Запрос - Ответ" конверт содержит обратный адрес для ответа. Наличие обратного адреса позволяет получить ответ на запрос.

При использовании сокетов REQ и REP нет никакой нужды создавать конверты самостоятельно; это автоматически делают сами сокеты. 

Для понимания интересно разобрать, как такие конверты используются с сокетом ROUTER.


Простая форма конверта для ответа REPLY.


Обмен данными "запрос-ответ" состоит из сообщения-запроса и, в конечном итоге, из сообщении-ответа.
В простой схеме "запрос-ответ" на один запроса приходится один ответ. 
В более сложных схемах запросы и ответы могут пересылаться асинхронно. Однако, ответный "конверт" всегда одинаковый.

В общем виде, ответный конверт ZMQ всегда состоит из нуля или более обратных адресов, следующих после пустого кадра. (разделителя конвертов). За ними следует тело сообщения - ноль или более кадров.
Конверт создается множеством сокетов, работающих в одной цепочке.


Например, в сокет REQ отправляем запрос "Hello". Сокет REQ создает простейший конверт для ответа без адреса, просто пустой кадр и кадр сообщения, содержащий строку "Hello". Т.обр., сообщение из двух кадров:

№ кадраДлинаСодержание
10
25Hello


Сокет REP, получив конверт, "вскрывает" его: удаляет разделитель конвертов (первый кадр) и передает сообщение приложению.

Если перехватить сетевые данные приложения, мы увидим, что каждый запрос и каждый ответ состоит из двух кадров: пустой кадр, а затем кадр с полезными данными. Для простого случая "REQ-REP" это выглядит не очень полезным. 

Однако, это важно для сокетов типа ROUTER и DEALER.


Расширенная форма конвертов для ответа

Рассмотрим, как работают пары сокетов REQ-REP через прокси (который использует сокеты ROUTER-DEALER), и как это влияет на форма конвертов для ответа. См. приложение.

В общем, совершенно никакой разницы, сколько прокси: ноль, один, два или больше:
[REQ] <-> [REP]
[REQ] <-[ROUTER|DEALER]-> [REP]
[REQ] <-[ROUTER|DEALER]<-[ROUTER|DEALER]-> [REP]

Вот псевдокод того, что делает прокси:

prepare context, frontend and backend sockets
while true:
    poll on both sockets
    if frontend had input:
        read all frames from frontend
        send to backend
    if backend had input:
        read all frames from backend
        send to frontend


Просто код (не псевдо):

 while true do begin
    zmq_poll(@fZMQPoll[0], Length(fZMQPoll), -1);
      // Проверка состояния сокетов из пула
    if (@fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then
      while True do
      begin // Трансляция сообщний от клиента к сервису
      // Обработка всх частей сообщения
        zmq_msg_init(@fMsg);
        zmq_msg_recv(@fMsg, fSocketFrontEnd, 0);
        fDoMore := zmq_msg_more(@fMSG) <> 0;
        zmq_msg_send(@fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(@fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
      while True do
        // Трансляция сообщний от сервиса к клиенту
      begin // Обработка всх частей сообщения
        zmq_msg_init(@fMsg);
        zmq_msg_recv(@fMsg, fSocketBackEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
  end;


Сокет ROUTER, в отличии от всех остальных, отслеживает каждое входящее соединение и сообщает об этом вызывающей стороне. Вызывающая сторона получает уведомление, что она должна проверять идентификатор каждого входящего сообщения, который будет следовать перед сообщением. Идентификатор, который иногда называют адресом, представляет собой всего-навсего двоичную строку, несущую единственную нагрузку: "Это - уникальный дескриптор связи". После этого, когда приложение отправляет сообщение через сокет ROUTER, первым отправляется кадр идентификации.

Вот что сказано об этом в документации по zmq_socket():

Перед передачей в приложение все сообщения, принимаемые сокетом ZMQ_ROUTER , должны предваряться частью собщения, идентифицирующей вызывающего корреспондента. Сообщения принимаются от всех подключенных корреспондентов в соответствии с алгоритмом справедливой очереди (fair-queued). При отправке сообщения, сокет ZMQ_ROUTER должен удалить первую часть сообщения и использовать её для идентификации корреспондента, которому должно быть доставлено сообщение.


В качестве идентификаторов ZMQ v2.* использовала UUID, а начиная с V3.0 используются короткие целые. Изменения улучшили производительность сети, правда, только в случае использования множества прокси - ретрансляторов (что бывает крайне редко).

Сокет ROUTER для каждого соединения генерирует случайное число. То есть, когда к сокету ROUTER в прокси подключается три клиента сокетами REQ, генерируется три случайных числа, по одному на каждый сокет REQ.


Итак, начнем разрабатывать поясняющий пример. Предположим, у сокета REQ трехбатовый идентификатор "ABC". Это подразумевает, что внутренние механизмы сокета ROUTER содержат хэш-таблицу, в которой ищется строка "ABC" и соответствующее данному сокету REQ соединение TCP.
При получении сообщения от сокета ROUTER socket, мы получаем три кадра:
Запрос с одним адресом
Номер кадраДлинаСодержаниеОписание
13ABCИдентификатор соединения
20
Пустой разделяющий кадр
35HelloКадр с данными


Ядро прокси в цикле просто читает данные из одного сокета и передает в другой, то есть буквально эти три кадра и попадают на вход сокета DEALER. Прослушка сетевого трафика показала бы, что эти три фрейма пролетают из сокета DEALER в REP. Сокет REP делает все так же, как было описано ранее: срезает с конверта все, включая новый обратный адрес и передает "Hello" абоненту - получателю.

Следует отметить, что сокет REP одновременно может работать только одним циклом запрос - ответ. Поэтому, если вы попытаетесь прочитать несколько запросов или оправить несколько ответов, строго не придерживаясь цикла "Запрос-Ответ", сокет вернет ошибку.

Теперь вполне понятен и обратный путь сообщения. Когда сервис возвращает ответ, сокет REP заворачивает его в "сохраненный" (при "вскрытии") конверт, и отправляет ответ из трех кадров через сокет DEALER.

Ответ с одним адресом
Номер кадраДлинаСодержаниеОписание
13ABCИдентификатор соединения
20
Пустой разделяющий кадр
35WorldКадр с данными

Далее сокет DEALER читает эти три фрейма и отправляет через сокет ROUTER. Сокет ROUTER берет первый кадр сообщения с идентификатором "ABC" и находит коннект, связанный с ним. Ели коннект найден, наружу перекачивается следующие два кадра - уже знакомый минимальный конверт

№ кадраДлинаСодержание
10
25World


Сокет REQ принимает сообщение, проверяет, что первым кадром идет пустой разделитель, отбрасывает кадр и передает в приложение "World"...

Все просто.

Ну и что?

Следует признать, что вот такие простые схемы "Запрос - Ответ" или даже "Запрос - Ответ с брокером" не так уж часто применяются. Кроме того, нет простого способа восстановления системы после таких вещей как падение сервера (например, из-за бажного кода). 

Методы построения надежных схем "Запрос - Ответ" будут рассмотрены позже.

Сейчас разберемся, как отважная четверка сокетов (REQ-REP-ROUTER-DEALER) борется с конвертами. Это позволит делать всякие полезные вещи.

Итак, мы поняли, что сокет ROUTER использует конверты для обратной пересылки, чтобы определить, какому из клиентских сокетов REQ следует направить обратный ответ. Или, иными словами:

  • Каждый раз сокет ROUTER принимает сообщение, оно с помощью идентификатора сообщает вам, какой корреспондент это сообщение прислал.
  • Вы можете воспользоваться хэш- таблицей (когда идентификатор - это ключевая строка), чтобы отследить вновь подключенного абонента.
  • Сокет ROUTER будет асинхронно, циклически обрабатывать всех корреспондентов, подключившихся к нему, если префикс - идентификатор идентичен первому кадру сообщения.
  • Сокеты ROUTER не обрабатывают конверт полностью. Они ничего не знают о пустых разделителях. Все ,что они делают - это работа с кадром идентификации, который позволяет выяснить, в какой из коннектов следует дальше отправить сообщение.

Что мы знаем про сокеты из схемы "Запрос - Ответ"?


  • Сокеты REQ перед сообщением отправляют в сеть пустой фрагмент-разделитель. Сокеты REQ - синхронные. Сокеты REQ всегда посылают один запрос и всегда ждут одного ответа. Сокеты REQ одновременно общаются только с одним корреспондентом. Если вы подключаете сокет REQ к некольким абонентам, запросы будет отправлены, а ответы получены по одному корреспонденту в каждом цикле запрос - ответ.
  • Сокет REP читает и сохраняет все кадры идентификации до пустого разделительного кадра включительно, затем передает следующий кадр или кадры абоненту. Сокеты REP - синхронные и одновременно общаются только с одним абонентом. Если вы подключаете сокет REP ко множеству абонентов, запросы читаются от корреспондентов последовательно по кругу (in fair fashion), а ответы всегда будут отправляться тому же самому корреспонденту, от которого был последний запрос.
  • Сокет DEALER не обращает на конверт возврата никакого внимания и обрабатывает его как любое составное сообщение. Сокеты DEALER - асинхронные и похожи на комбинацию сокетов PUSH и PULL. Они распределяют отправляемые сообщения по всем соединениям, и принимают сообщения от всех соединений по алгоритму справедливой очереди.
  • Сокет ROUTER не обращает внимания на конверт возврата, как и сокет DEALER. Он создает идентификаторы своих соединений, и передают эти идентификаторы корреспондентам в виде первого кадра каждого входящего соединения. И наоборот, когда корреспондент отправляет сообщение, он использует первый кадр как идентификатор, чтобы найти соединение для отправки. Сокет ROUTERS - асинхронный.


Допустимые комбинации сокетов
От (connect)НаправлениеК(bind)
REQ-->REP
DEALER-->REP
REQ-->ROUTER
DEALER-->ROUTER
DEALER-->DEALER
ROUTER-->ROUTER


Недопустимые комбинации:

От (connect)НаправлениеК(bind)
REQ-->REQ
REQ-->DEALER
REP-->REP
REP-->ROUTER

Рассмотрим эти комбинации подробнее.

"Запрос - Ответ", рабочие комбинации сокетов.
Некоторые особенности.

REQ->REP
Мы уже рассматривали, как клиент REQ общается с сервером REP. Одно замечание: клиент REQ должен быть инициатором потока сообщений. Сервер REP не может начать общение с клиентом REQ, пока тот не пришлет запрос. 

DEALER->REP

Далее. Меняем в клиенте сокет REQ на сокет DEALER. Это дает асинхронного клиента, который может общаться с множеством серверов REP. Если перепишем клиента "Hello World", используя сокет DEALER, мы сможет отправлять любое число запросов "Hello" без ожидания ответов.

Когда мы используем сокет DEALER для общения с сокетом REP, мы должны тщательно эмулировать формировоание конверта, который должен был посылать сокет REQ, или же сокет REP будет отбрасывать сообщение как неправильное. Итак, чтобы отправить сообщение:
  • Отправляем пустой кадр с установленным флагом "MORE".
  • Затем отправляем тело сообщения.

А когда принимаем сообщение, то:
  • Принимаем первый кадр и, если он не пуст - отбрасываем все сообщение.
  • Принимаем следующий кадр и передаем его в приложение.

REQ -> ROUTER

Можем заменить не только REQ на DEALER, но и REP на ROUTER. Это дает нам асинхронный сервер, который может общаться с множеством REQ клиентов одновременно. Если мы сервер перепишем "Hello World", используя сокет ROUTER, мы сможем обрабатывать параллельно любое число запросом "Hello". Мы это уже делали.

Есть два различных способа использования сокетов ROUTER::
  1. как прокси, который переключает сообщения между сокетами frontend и backend.
  2. как приложение, которое читает сообщения и реагирует на них.

В первом случае сокет ROUTER просто читает все кадры, включая искусственный кадр с идентификатором, а затем вслепую передает их
Во втором случае сокет ROUTER должен знать формат конверта возврата, который он отправляет... Так как второй сокет в паре типа REQ то сокет ROUTER получает кадр идентификации, пустой кадр-разделитель и затем - кадр данных.

DEALER -> ROUTER

Теперь заменим сразу оба сокета - и REQ, и REP на DEALER и ROUTER. Получаем самую мощную комбинацию сокетов, в которой DEALER общается с ROUTER. Это дает нам асинхронного клиента, общающегося с асинхронным сервером, когда обе стороны имеют полный контроль над форматом сообщений.

Так как и DEALER и ROUTER могут работать с сообщениями любого формата, нам придется "немного" поработать в качестве проектировщика протокола. Как минимум, нужно решить - будем ли мы эмулировать конверты возврата REQ/REP. Все зависит от того, нужно ли вам в действительности отправлять ответы или нет.

DEALER -> DEALER
Можно поменять не только REP на ROUTER, но и REP на DEALER, если DEALER общается с одним и только одним корреспондентом.

При  замене REP на DEALER приложение становится полностью асинхронным и сможет посылать и принимать любое число запросов и ответов. За это придется заплатить необходимостью управлять формированием конвертами ответа самостоятельно, и получать их либо правильно - иначе вообще ничего работать не будет. 
Пока отмечу, что пара сокетов DEALER -> DEALER - одна из самых хитрых, и хорошо, что в действительности она бывает нужна нечасто.

ROUTER -> ROUTER 

Название пары звучит так, как будто она идеальна для организации соединений N-to-N, но в действительности это наиболее сложная для использования комбинация. В руководстве рекомендуют избегать её, пока не станете докой в ZeroMQ.


Немного поговорим о нерабочих комбинациях сокетов.

Обычно такое случается, например, при попытке подключить клиента к клиенту, или  сервера к серверу. Вроде понятно, но лучше разобраться подробнее.


  • REQ -> REQ: обестороны желают начать отправлять сообщения в другую сторону. Как бы можно придумать, для чего в дно и тоже время такое может понадобиться, но что-то вот не придумывается.
  • REQ -> DEALER: теоретически такое возможно, но все станет плохо, когда появится еще один REQ, так как нет возможности узнать, кому отправлять ответ на исходный запрос. То есть, сокет REQ запутается и/или вернет ответ на сообщение другого клиента.
  • REP _> REP: обе стороны будут ждать другую сторону, пока кто-то первом не пришлет сообщение.
  • REP -> ROUTER: теоретически, сокет ROUTER может инициировать диалог и отправить правильно сформированный запрос, если он знает, какой именно REP сокет подключен (это нужно знать для идентификации соединения). Это повлечет массу ненужного и грязного кода и не даст ничего по сравнению со схемой DEALER -> ROUTER.
The common thread in this valid versus invalid breakdown is that a ZeroMQ socket connection is always biased towards one peer that binds to an endpoint, and another that connects to that. Further, that which side binds and which side connects is not arbitrary, but follows natural patterns. The side which we expect to "be there" binds: it'll be a server, a broker, a publisher, a collector. The side that "comes and goes" connects: it'll be clients and workers. Remembering this will help you design better ZeroMQ architectures.

Выводы.

Главным выводом этих рассуждений о правильных и неправильных комбинациях является то, что соединения сокетов ZeroMQ всегда направлены в сторону одного из корреспондентов, который привязывается к конкретной конечной точке, а другой подключается к нему. Более того, вовсе не так важно, какая сторона "биндится", а какая - "коннектится". Здесь следует обращать внимание на более натуральное поведение: та, сторона, от которой мы ожидаем, что она "там есть" - и будет сервером, брокером, коллектором. А та сторона, подключения которой "приходят и уходят" - будет клиентом или рабочим. Будем об этом помнить, чтобы разбираться в архитектуре использования ZeroMQ.

...

Для дальнейшей работы понадобится инструмент для более детального просмотра сообщений, поступающих в сокет - процедура s_dump(), которая будет читать все части сообщения из сокета и выводить их в консоль. Эта процедура уже добавлена в ZQM_Utils.pas:
 
interface
...
// Читает из сокета сообщение и показывает его в консоли с разбивкой по кадрам
procedure s_dump(aSocket: Pointer);
...
implementation
...
procedure s_dump(aSocket: Pointer);
var
  fZMQMsg: zmq_msg_t;
  fSize: Integer;
  fIsText: Boolean;
  i: Integer;
  fData: PChar;
  fMore: UInt64;
  fMoreSize: size_t;
begin
  Writeln('----------------------------------------');
  while true do begin
        //  Обработка всех частей сообщения fZMQMsg
    zmq_msg_init(@fZMQMsg);
    fSize := zmq_msg_recv(@fZMQMsg, aSocket, 0);

        //  Вывод сообщения fZMQMsg в виде текста или hex
    fData := zmq_msg_data(@fZMQMsg);
    fIsText := True;
    for i := 0 to Pred(fSize) do
      if not (fData[i] in [#32..#127]) then begin
        fIsText := False;
        break
      end;
    Write(Format('[%3u] ', [fSize]));
    for i := 0 to Pred(fSize) do begin
      if (fIsText) then
        Write(fData[i])
      else
        Write(Format('%.2x ', [Integer(fData[i])]))
    end;
    Writeln;
    fMore := 0; //  Сообщение составное?
    fMoreSize := sizeof(fMore);
    zmq_getsockopt(aSocket, ZMQ_RCVMORE, @fMore, fMoreSize);
    zmq_msg_close(@fZMQMsg);
    if fMore = 0 then
      break; //  Последняя часть сообщения
  end;
end;


Пригодится, когда начнем разбирать "конверты".

Сокеты ROUTER, более подробно.

 
Концепция идентификаторов и адресов
  • Идентификация сообщений в ZMQ касается только сокетов ROUTER. 
  • В широком смысле идентификатор представляет собой обратный адрес для ответа в составе конверта сообщений. В большинстве случаев, идентификаторы являются случайным значением, и они локальны в рамках одного сокета ROUTER. Идентификатор - ключевое значение для поиска в хэш-таблице.
  • Независимо от значений идентификаторов, соединения могут иметь физические адреса ("tcp://192.168.55.117:5670"), или логические (UUID или email адрес или любое уникальное ключевое значение).

Приложение, которое общается с абонентами с помощью сокета ROUTER, может преобразовывать логические адреса в идентификаторы с помощью встроенных хэш-таблиц. Так как в момент передачи сообщения сокет ROUTER задает идентификацию соединения для конкретного корреспондента-отправителя, вы можете ответить лишь этому отправителю, а не произвольному абоненту сокета.

Пример крошечного приложения, в котором сообщения пересылаются по inproc протоколу от сокетов REQ к сокету ROUTER:
 
program IC;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h, ZMQ_Utils;
var
  fContext: Pointer;
  fSocketSink: Pointer;
  fSocketAnonymous: Pointer;
  fSocketIdentifier: Pointer;
begin
  fContext := zmq_ctx_new();
  fSocketSink := zmq_socket(fContext, ZMQ_ROUTER);
    // Настройка приемника
  zmq_bind(fSocketSink, 'inproc://example');

// 1. Разрешаем 0MQ установить идентификатор
  fSocketAnonymous := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketAnonymous, 'inproc://example');
  s_send(fSocketAnonymous, 'ROUTER uses a generated UUID');
  s_dump(fSocketSink); // Смотрим, что на выходе приемника

// 2. Устанавливаем идентификатор самостоятельно
  fSocketIdentifier := zmq_socket(fContext, ZMQ_REQ);
  zmq_setsockopt(fSocketIdentifier, ZMQ_IDENTITY, PChar('PEER2'), 5); //!
  zmq_connect(fSocketIdentifier, 'inproc://example');
  s_send(fSocketIdentifier, 'ROUTER socket uses REQ''s socket identity');
  s_dump(fSocketSink); // Смотрим, что на выходе приемника

  zmq_close(fSocketSink);
  zmq_close(fSocketAnonymous);
  zmq_close(fSocketIdentifier);
  zmq_ctx_destroy(fContext);
  Readln;
end.

Вывод приложения:

Обработка ошибок сокета ROUTER.


Ну, не то чтобы ошибок. 

Если сокет ROUTER не может определить, куда отправить сообщения, он просто удаляет их. Для реальных приложений это, наверное, правильно (клиент отвалился - что тут поделаешь?), но это затрудняет отладку - особенно если обратный конверт для сокета ROUTER формируется "ручками".

Начиная с ZeroMQ v3.2, у сокетов появилась опция для перехвата ошибок: ZMQ_ROUTER_MANDATORY. Устанавливаем ей для сокета ROUTER получаем возможность отловить ситуацию, когда индентификации недостаточно для работы сокета ROUTER, сокет будет сигнализировать ошибкой EHOSTUNREACH.

Замечание: Утверждают, что дельфийские Write/Writeln небезопасны в нитях => добавим безопасную функцию вывода строки в консоль. Снова расширим ZMQ_Utils.pas.
 
// "Потокобезопасно" выводит строку в консоль
procedure z_Log(const aStr : string);

Кроме того, для дальнейших экспериментов с идентификацией соединений добавим еще пару функций:

// Формирует строку указанной длины со случайным заполнением
function s_random(aLen: Integer): string;

//  Устанавливает случайный текстовый идентификатор для сокета
procedure s_set_id(aSocket: Pointer);

Реализация:

procedure s_set_id(aSocket: Pointer);
//  Устанавливает случайный текстовый идентификатор для сокета
begin
  zmq_setsockopt(aSocket, ZMQ_IDENTITY, PChar(s_random(10)), 10 * SizeOf(Char));
end;

function s_random(aLen: Integer): string;
// Формирует случайную текстовую строку
const
  Chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ';
var
  i: integer;
begin
  Randomize;
  result := '';
  for i := 1 to aLen do
    result := result + Chars[Random(Length(Chars)) + 1];
end;

Далее рассмотрим применение полученных знаний при реализации шаблона "Балансировка нагрузки" : (Продолжение)

Комментариев нет :

Отправить комментарий