суббота, 1 ноября 2014 г.

04. ZeroMQ: картинки из жизни сокетов и базовые сетевые топологии. Немного теории.

(Начало здесь).


Жизнь сокетов ZeroMQ состоит из четырех частей.

  1. Создание и уничтожение, которые должны идти парами: см zmq_socket (), zmq_close ().
  2. Настройка сокета: установить параметры сокета: zmq_setsockopt (), проверить настройки сокета: zmq_getsockopt ().
  3. Подключение сокета в топологию сети путем создания сходящего или исходящего ZeroMQ соединения: zmq_bind (), zmq_connect ().
  4. Использование сокета для передачи данных путем записи и приема сообщений в/из них: zmq_msg_send ()/ zmq_msg_recv ().

Сокеты в Delphi представлены просто указателями (Pointer).
А сообщения zmq_msg_t - структурой (массив длиной 48 байт):

zmq_msg_t = packed record
  _: Array[0..47] of Byte;
end; 

Подключение сокетов

Для создания соединения между двумя узлами на одном узле использует zmq_bind(), а на втором zmq_connect().
Принято считать, что узел, где используется zmq_bind() - это "сервер", который располагается в заранее известной точке сети (т.е., имеет фиксированный сетевой адрес). А узел, где используется zmq_connect() - "клиент", его сетевой адрес заранее неизвестен. Говорят "привязка" сокета ("биндинг") и подключение ("коннектинг"). То есть "привязываем" сокет к конкретной точке, и "подключаем" сокет к конкретной точке, "конкретная точка" - это известный сетевой адрес.

Соединения ZeroMQ отличаются от привычных соединений TCP:



  • Работают по разным транспортным протоколам (inproc, ipc, tcp, pgm, epgm).
  • Один сокет может иметь много исходящих и много входящих соединений.
  • Метода Accept() /zmq_accept() нет! Когда сокет привязывается к конретной точке, Accept() стартует автоматически.
  • Сетевые соединения выполняются в фоне, а ZeroMQ автоматом реконнектится, если сеть обрывается и восстанавливается.
  • Приложение не будет работать с соединением напрямую, только через сокет ZeroMQ.


В ранее рассмотренных примерах мы запускали клиента до запуска сервера, безо всяких проблем.
В обычных сетях мы бы уже получили сообщение, что сервер не готов и т.д.
Но ZeroMQ позволяет запускать и останавливать разные сетевые компоненты в произвольной последовательности. Как только узел - клиент вызывает zmq_connect (), считается, что соединение уже существует, и что узел может начать писать сообщения в сокет.
На определенном этапе (хорошо бы, до момента переполнения очереди сообщений ), сервер оживает, выполняет zmq_bind (), и ZeroMQ начинает доставлять сообщения.

Узел-сервер может биндиться к нескольким конкретным точкам сети. И даже к разным протоколам.

zmq_bind (socket, 'tcp://*:5555');
zmq_bind (socket, 'tcp://*:9999');
zmq_bind (socket, 'inproc://somename');

К конкретной точке сети нельзя биндиться дважды. Если будет запущено два сервера, выполняющие такой код:

zmq_bind (socket, 'tcp://*:9999');


- то работать будет первый, второй будет ждать.

Пишут, однако, что для ipc транспорта допускается множественный биндинг, но нас, дельфистов,  это пока не касается: ipc пока реализован только на всяких линуксах.

По поводу протокола inproc.

Протокол inter-thread (inproc) - это транспорт, основанный на сигналах. Очень быстрый. Но (до версии 4.0.0) с ограничениями: сервер должен выполнить биндинг до того, как клиент попытается законнектиться. Теперь это ограничение снято, но, если вам дорога обратная совместимость Ваших программ, используйте рекомендованную схему: главная нить создает сокет, биндит его к уникальному имени, потом создает дочернюю нить, в котором так же создается сокет, который коннектится к inproc с тем же именем.

В остальном работать с inproc точно так же:

Биндинг в одной нити:

//  Имя - "#1"
zmq_bind(socket, 'inproc://#1');
...
//  Имя - 'my-endpoint'
zmq_bind(socket, 'inproc://my-endpoint');

Конектинг в другой:

zmq_connect(socket, 'inproc://#1');
...
zmq_connect(socket, 'inproc://my-endpoint'); 

То есть: 'имя протокола://уникальное имя коннекта'

Важно! 

Связанные между собой Inproc - сокеты должны работать в одном контексте


Шаблоны сетевых топологий ZeroMQ. 

 

В первой части было представлено и подробно описано приложение, реализующее самый простой шаблона сетевой топологии вроде "Запрос - Ответ".

Во второй части - приложение, реализующее шаблон "Издатель - Подписчик".

В третьей части был представлен шаблон решения задачи методом "Разделяй и Властвуй" (когда задачу разделяем на кусочки и разадаем исполнителям). При этом использовался шаблон "Параллельный трубопровод" (Parallel Pipeline).

Считается, что ZMQ "искаропки" реализует четыре шаблона:

  1. Запрос-Ответ (Request-Reply), в котором соединяются множество клиентов к множеству сервисов ("серверов"). Этот шаблон предназначен для реализации удаленного выполнения каких-либо задач, вроде привычных примитивных клиент-серверных схем.
  2. Издатель - Подписчик (Pub-sub), в котором соединяются множество издателей с множеством подписчиков. Это - шаблон для реализации задачи предоставления данных по подписке.
  3. Трубопровод (Pipeline), в котором соединяются узлы для "проталкивания" (в т.ч. параллельного) сообщений, для задач, требующих в процессе решения множество этапов перемещения (в том числе циклического) данных. Это - шаблон для решения задач вроде разделения большого задания на несколько меньших с последующим сбором результатов.
  4. Эксклюзивная пара (Exclusive pair), в которой соединяются только два сокета. Это - шаблон для решения задач вроде связи двух нитей в процессе.
Четвертый шаблон ("Эксклюзивная пара") представляется слишком очевидным для того, чтобы делать под него отдельно приложение, использовать его можно по только по iproc- протоколу. Сокеты при этом создаются с опцией ZMQ_PAIR.

 В каждом из представленных приложениях сокеты настраивались в соответствии с задачей.
Для пар сокетов ZMQ есть следующие допустимые комбинации настроек:


    PUB    - SUB
    REQ    - REP
    REQ    - ROUTER
    DEALER - REP
    DEALER - ROUTER
    DEALER - DEALER
    ROUTER - ROUTER
    PUSH   - PULL
    PAIR   - PAIR

Половину из этих комбинаций мы уже знаем. :)
Остальные я еще только собираюсь пощупать.

Из документации:
Использование пар сокетов ZMQ в других комбинациях повлечет появление недокументированных эффектов и, возможно, чудовищные разрушения оборудования.

Поговорим о СООБЩЕНИЯХ ZMQ

В рассмотренных ранее тестовых приложениях мы пользовались методами zmq_send () и zmq_recv (). В их параметрах мы указывали сокет, адрес буфера с данными и длину данных. Все незатейливо. Беда в том, что zmq_recv() нельзя использовать, если не знаешь заранее размер принимаемого сообщения: если сообщение не поместится, хвост отсечётся.

Так вот, в API ZMQ есть чудесные методы для работами со структурой zmq_msg_t.
Здесь уже возможностей больше (но и кодить придется больше):

Инициализация сообщения: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data().
Отправка и получение соощения: zmq_msg_send(), zmq_msg_recv().
Освобождение (ресурсов) сообщения: zmq_msg_close().
Доступ к данным сообщения: zmq_msg_data(), zmq_msg_size(), zmq_msg_more().
Работа со свойствами сообщения: zmq_msg_get(), zmq_msg_set().
Манипуляции с сообщениями: zmq_msg_copy(), zmq_msg_move().

Как уже было сказано, в памяти сообщение представляется собой структуру zmq_msg_t (массив длиной 48 байт):

zmq_msg_t = packed record
  _: Array[0..47] of Byte;
end; 

Особенности работы с сообщениями ZMQ:
  • вы создаете и передаете в работу только объекты zmq_msg_t, а не блоки данных;
  • для чтения сообщения, сначала вызываем zmq_msg_init() (создается пустое сообщение), а затем передаем его в zmq_msg_recv();
  • для инициализации сообщения вашими данными, необходимо вызвать zmq_msg_init_size() - будет создано сообщение и выделен блок данных указанного размера. Затем вы "руками" заполняете данные и передаете сообщение в zmq_msg_send().
  • для освобождения (для очистки, а не для удаления объекта zmq_msg_t), вызываем zmq_msg_close(). Это действие сбросит ссылку в ZMQ на структуру и для ZMQ соощение перестанет существовать.
  • для доступа к содержимому сообщения, используем zmq_msg_data(). Для получения длины блока данных в байтах используем zmq_msg_size().

Внимание! Операции zmq_msg_move(), zmq_msg_copy(), zmq_msg_init_data() опасны разрушением контекста, если вы не знаете, для чего и как их использовать.

После того, как сообщение было передано в zmq_msg_send(), ZMQ очистит сообщение, то есть, например, заполнит структуру нулями.

PS: проверка показала, что непонятно, что там делается реально - сообщение то зануляется, то нет. Ну, раз сказано, значит, так и есть.

Вы не можете отправить одно и то же сообщение дважды, точно так же вы не можете получить доступ к блоку данных сообщения после его отправки.

Для zmq_send() и zmq_recv() эти правила не работают, тут вы пересылаете свой массив байт, а не структуру zmq_msg_t.

Если все же нужно отправить одно и то же сообщение более, чем один раз, и оно довольно большое, создаем еще одно сообщение, инициализируем его с помощью zmq_msg_init(), а потом используем zmq_msg_copy(). Будет создана копия исходного сообщения. Такой способ не создает копию сообщения, а просто создает копию ссылки на сообщение.
Теперь это сообщение можно отправить дважды (если нужно больше - создаем больше копий):

  zmq_msg_init_size(fMsg1, 9999 * SizeOf(Char)); // Инициализация
  FillChar(zmq_msg_data(fMsg2)^, 9999, 'ё'); // Запонение буквами ё

  zmq_msg_init(fMsg2); // Создание копии
  zmq_msg_copy(fMsg2, fMsg1);

  zmq_msg_send(fMsg1, fSocketSender, 0);
  zmq_msg_send(fMsg2, fSocketSender, 0);


Сообщение будет полностью удалено только после того, как будет отправлена последняя копия.
 
ZeroMQ также поддерживает сообщения, состоящие из нескольких частей (фрагментов, кадров, фреймов).

В этом случае в одном сообщении содержится несколько кадров.
Для однокадровых сообщений структура пересылаемых данных очень проста:
[Длина блока данных][...блок данных ...]
Составные (многокадровые) сообщения состоят из нескольких таких кадров, например
 
3 'ABC'    // Длина: 3 байта + строка из трех однобайтных символов
0          // Длина: 0 байт (пустой кадр)
6 'Привет' // Длина: 6 байт + строка из шести однобайтных символов

Составные сообщения широко используются в реальных приложениях, их применение будет рассмотрено позднее. Пока просто знаем, что такие - бывают.

Фреймы представляют собой базовый для ZMQ формат, в котором данные передаются по выбранному транспорту.

Длина - от нуля и дальше.

PS: фреймы - это несомненная польза с т.зр. программистов, использовавших TCP, для которых всегда был актуален вопрос: "а сколько мне нужно прочитать данных из данного сокета в данных момент?".
Такой покадровый обмен данными называется ZMTP протоколом.

Обычно сообщения ZeroMQ состоит из одного кадра, наподобие датаграммы UDP.
Для превращения сообщения в мультикадровое ZMQ просто устанавливает в кадре бит "есть ещё!". Затем читаем следующие кадры, пока не получим последний кадр со сброшенным битом.

И так.

  • Сообщение может состоять из одного и более частей.
  • Части сообщения называются кадры/фреймы (frames).
  • Каждая часть есть представляется объектом zmq_msg_t.
  • Программист принимает и отправляет каждую часть отдельно (используя API нижнего уровня).
  • API высших уровней обеспечивают обертками для отправки составных сообщений целиком.

Кроме того:

Можно посылать сообщения нулевой длины. Например, для отправки сигнала из одного потока в другой. Или - как мы делали в приложении Worker:

zmq_send(fSocketSender, PChar(fDummy)^, 0, 0); // Отправка сигнала сборщику результата
ZeroMQ гарантирует доставку либо всех частей сообщения, либо ни одной части.

ZeroMQ не отправляет сообщение (ни простое, ни составное) прямо сразу, а выполняет это с задержкой. То есть, нужно учитывать, что составные сообщения должны помещаться в памяти.

Еще раз: сообщение (простое или составное) должно помещаться в памяти. Если вам нужно отсылать здоровенные файлы, то разбейте их на части и отправьте каждую часть в сообщении, состоящем из одной части, использование составного сообщения не позволит экономить память.

После использования принятого сообщения, его нужно закрыть: zmq_msg_close().
При отправке сообщения этот метод вызывать не нужно. То есть, чисткой занимается получатель.

Ах, да.

Есть удивительный метод zmq_msg_init_data(). Этот метод позволяет инициализировать сообщение без копирования данных пользователя. Для скорости, с целью экономии памяти и т.д.
Так вот, пока не используем его, если не хотим проблем (например: для использования метода нужно определить деаллокирующую данные функцию, да не простую, а thread-safe).
В документации сказано то же самое.


Составные сообщения.

Процесс доставки сообщения атомарен.

Атомарен.
То есть, если на принимающей стороне API сообщило, что пришло сообщение, это значит, что сообщение уже полностью принято (и присутствует в памяти принимающей стороны). А если в процессе пересылки возникнут какие-то проблемы, то принимающая сторона об этом даже не догадается, до приемника сообщение просто не дойдет.
Гарантированной доставки нет. О гарантированной доставке нужно заботиться программисту - с помощью нумерации сообщений, подтверждающих запросов, таймаутов и проч.




Так вот, как было сказано ранее, сообщение может быть составным (состоять из нескольких кадров).
В реальных приложениях это полезно - например, для реализации собственных структур и алгоритмов (например, для простых способов сериализации объектов).

Насчет составных сообщений. Для каждой части (кадра) составного сообщения нужен свой экземпляр структуры zmq_msg_t.
Необязательно, чтобы все части (zmq_msg_t) были объявлены одновременно, можно инициализировать очередной кадр и отправлять его с признаком ("будет ещё!"), используя единственный zmq_msg_t. Однако, все кадры сообщения будут накапливаться в памяти передающей стороны до момента отправки.

Например, отправляем составное сообщение из 5 кадров. Можно объявить, инициализировать, заполнить данными, отправить и очистить пять разных zmq_msg_t. А можно работать с одним zmq_msg_t для всех пяти кадров последовательно.

Пример (сообщение из 5 кадров), 5 экземпляров zmq_msg_t:

var
...
  fMsg_1: zmq_msg_t;
  fMsg_2: zmq_msg_t;
  fMsg_3: zmq_msg_t;
  fMsg_4: zmq_msg_t;
  fMsg_5: zmq_msg_t;


begin
...
  zmq_msg_send(fMsg_1, fSocketSender, ZMQ_SNDMORE); // Кадры с признаком "будет ещё!"
  zmq_msg_send(fMsg_2, fSocketSender, ZMQ_SNDMORE);
  zmq_msg_send(fMsg_3, fSocketSender, ZMQ_SNDMORE);
  zmq_msg_send(fMsg_4, fSocketSender, ZMQ_SNDMORE);
  zmq_msg_send(fMsg_5, fSocketSender, 0); // Завершающий кадр

Пример приема сообщения, состоящего из нескольких кадров. Испоьзуется 1 экземпляр zmq_msg_t:

var
...
  fMsg: zmq_msg_t;
...
begin
...
  while True do begin
    zmq_msg_init(@fMsg);
    zmq_msg_recv(@fMsg, fSocketReceiver, 0);
// Обработка кадра сообщения
    if not zmq_msg_more(@fMsg) then
      break; // Это был последний кадр
    zmq_msg_close(@fMsg);
  end;
...


Вещи, которые важны при работе с составными сообщениями:
  • При отправки составного сообщения и первый, и все последующие кадры начнут отправляться "физически" только тогда, когда был отправлен финальный кадр.
  • Если на приеме используется zmq_poll(), то следует иметь в виду, что в момент, когда вы получили первую часть сообщения, все остальные части были уже получены.
  • Вы физически получаете либо все части сообщения, либо ни одного.
  • Каждая часть сообщения есть отдельный элемент zmq_msg_t.
  • Вы получите всегда ВСЕ части сообщения, вне зависимости от того, проверяете ли вы свойство "ещё!".
  • На передающей стороне ZeroMQ последовательно помещает кадры сообщения в очередь, до кадра с признаком "последний" (вернее, без признака "будет ещё!" ), а только потом выполняется пересылка всех кадров сообщения.
  • Не существует иного способа отказаться от частично отправленного сообщения, кроме как закрыть сокет: zmq_close()

~~~~~~~~~~~~~~~~~~~~~~~~

Дальше поговорим о том, как читать сообщения из нескольких конечных точек (из разных сокетов) одновременно. (Продолжение.)

Комментариев нет :

Отправить комментарий