(Начало здесь)
Проблема динамического обнаружения
Вопрос сродни вот этому: Как получить список доступных MS SQL серверов?
Проблема динамического обнаружения
Вопрос сродни вот этому: Как получить список доступных MS SQL серверов?
Одной из проблем, возникающих при проектировании крупных распределенных систем является обнаружение сервисов. Частный случай проблемы - "как клиент узнает, к какому серверу нужно коннектиться?" А в общем случае - "как элементам сети найти друг друга?"
Особенно трудно в случае, когда разные элементы подключаются и отключаются, из разных узлов сети. Поэтому это называется "проблемой динамического обнаружения".
Эта проблема решается везде по-разному: используется служба DNS, или широковещательные сообщения (UDP) и т.п.
Есть несколько простых решений проблемы динамического обнаружения. Можно жестко закодировать адрес(ip + порт) (т,е., "конечную точку"). Вообще никаких проблем - и никакой гибкости. Впрочем, некоторой гибкости для tcp транспорта можно добавить с помощью службы DNS.
Можно конечную току задавать с помощью конфигурационных файлов (и т.д.). Главное - не забывать их вовремя обновлять

Можно использовать специальный брокер адресации, который передаст вам нужные данные. Только вот адрес этого брокера должен быть известен...
Можно построить средство, исследующее окружение, сканирующее известные диапазоны адресов и порты. Или рассылающее (получающее) udp - пакеты об адресной информации.
...
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Можно при запуске нового элемента сети вручную задавать конечную точку ("адрес сервера"). То есть, при подключении вы просто знаете конфигурацию сети и сообщаете нужные данные новому элементу сети. Так часто делают, но в реальности это приводит к громоздким и хрупким топологиям.
Вспомним пример "Издатель-Подписчик".
Пусть есть один издатель и тысяча подписчиков. Каждый подписчик коннектится к одному издателю. Можно руками конфигурировать подключение каждого подписчика, это просто. Сделали, работает.
Дальше. Подписчик - динамичный (может подключаться из любого места, и включаться когда угодно), а издатель - статичный.
Сделали, работает.
Нужно динамическое обнаружение.
Добавим брокер сообщений. ZMQ не поставляется с брокерами, но позволяет его легко построить.
Теперь Издатель-Подписчик схема будет выглядеть вот так:
Таким образом, со статичным борокером сообщений задача решена.
Можно даже запустить общий брокер для всех типов сообщений, и строить сетевую архитектуру вокруг него.
Т.е., топология "Звезда" работает. Какое-то время.
...пока не возникнут вопросы вроде пропускной способности и расширения/усложнения логики брокера.
АДЪ неминуем.
Решение.
Куда лучше реализовать брокер не в качестве транспорта сообщений, а в качестве поставщика адресной информации.
Для этого следует использовать сокеты ZMQ типа XPUB и XSUB, так как с ними ZMQ не пересылает сообщения от издателя к подписчику напрямую.
Сокеты XSUB и XPUB - точно такие же, как и сокеты типа SUB и PUB, за исключением того, что они обрабатывают подписки в форме специальных сообщений. А при подключении SUB и PUB сокетов к XPUB и XSUB сокетам первые связываются друг с другом уже по известным адресам.
То есть, основной поток данных идет, минуя брокер:
PS: с сокетами XPUB/XSUB я пока не работал. Возможно, все не так, как здесь описано, так как я просто плохо понял документацию на английском. Но я обезательно разберусь и отпишусь здесь.
....
....
....
Шаблон "Разделяемая очередь".
Потихоньку перейдем к сокетам DEALER и ROUTER.
...
В реальности может потребоваться, чтобы множество клиентов могли подключаться к разным сервисам (например, для распределения нагрузки по сервисам).
Для реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.
Например, коннектим клинтский сокет к трем конечным точкам ("сервисам") :A, B, и C.
Вспомним этот пример, поясняющий схему "Запрос - Ответ" (REQ - REP). Чтобы этот клиент подключился к трем сервисам (например, по tpc на localhost, на трех разных портах), сокет следует приконнектить ко всем трем конечным точкам:
zmq_connect(requester, 'tcp://localhost:5555'); zmq_connect(requester, 'tcp://localhost:5556'); zmq_connect(requester, 'tcp://localhost:5557');
Клиент последовательно выполняет запросы R1, R2, R3, R4. В результате запросы R1 и R4 отправляются к сервису A, R2 - к B, R3 - к C. Циклически распределяя запросы (наш любимый roud-robin). Такая конструкция позволяет добавлять без проблем добавлять сколько угодно клиентов.
Как показано выше, с помощью zmq_connect() можно добавлять сколько угодно сервисов. Беда в том, что клиент должен знать, где находится новый сервис. Если клиентов - 100, и в течении скажем, суток, добавляется всего три новых сервиса, то нужно в итоге триста раз переконфигурировать всех клиентов.
Что грустно.
В идеале, мы должны быть в состоянии добавлять и удалять сервисы или клиентов в любое время, не касаясь любой другой части топологии.
...
Для реализации коннектов "много:много" есть два пути:
- Каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.
- Второй путь - использование брокера запросов как промежуточного слоя.
Напишем крошечный брокер запросов, реализующий желаемую гибкость топологии.
Брокер соединит две конечные точки - фронтенд (сокет стороны клиентов) и бэкэнд (сокет стороны сервисов).
Затем брокер, используя zmq_poll(), будет отслеживать активность этих сокетов, и перебрасывать сообщения от одного сокета к другому. При чем, в ручном управлении очередности использования сервисов нет необходимости, т.к. ZeroMQ делает это автоматически для каждого сокета.
Когда мы построили приложение по схеме "Запрос - Ответ (Req_Rep)", система получилась с синхронным диалогом обмена. Клиент шлет запрос. Сервис читает запрос и шлет ответ. Клиент читает ответ. Если клиент или сервис будут выполнять что-то другое (например, клиент пошлет два запроса подряд без ожидания ответа), система просто перестанет работать.
...
Конечно, раз теперь мы умеем пользоваться zmq_poll(), мы можем сделать брокер неблокирующим.
Но мы пойдем другим путем, и вообще не станем использовать сокеты типа REP и REQ.
Так вот, есть схема использования пар сокетов, которые реализуют схему "Посредник - Маршрутизатор"; режимы сокетов соответственно называются DEALER и ROUTER. Они позволяют получить неблокирующий режим для схемы "Запрос - Ответ".
В нашей схеме "Запрос - Ответ" сокет REQ будет "говорить" с сокетом ROUTER, а сокет DEALER - с сокетом REP.
Сокеты DEALER и ROUTER будет как раз размещаться на нашем брокере, а передачу сообщений между ними мы обеспечим с помощью кода. Будем извлекать сообщение из одного сокета и передавать его другому сокету.
Наш брокер схемы "Запрос - Ответ" привязывается к двум конечным точкам: одна для коннектов к ней клиентов(фронтэнд сокет), вторая - для коннектов сервисов(бэкэнд сокет).
Задача та же, что и в первом примере: возведение в квадрат целых чисел. Клиент отсылает запросы, сервис (сервер) - выполняет. Запрос - беззнаковое x32 целое число (UInt32) , ответ - квадрат беззнаковое x64 целого (UInt64).
Вот наш немудрёный REQ - клиент :
program BrRR_Client; {$APPTYPE CONSOLE} // Клиент // Коннектится сокетом REQ к tcp://localhost:5559 // Шлет целое число сервису (серверу), обратно получает квадрат числа uses SysUtils, ZMQ_h; const c_ReqCnt = 100; var fContext: Pointer; fSocketRequester: Pointer; fSrcVal: Cardinal; fResultVal: UInt64; i: integer; begin fContext := zmq_ctx_new(); // Инициализация fSocketRequester := zmq_socket(fContext, ZMQ_REQ); zmq_connect(fSocketRequester, 'tcp://localhost:5559'); // Коннект к сервису Randomize(); for i := 0 to Pred(c_ReqCnt) do begin fSrcVal := Cardinal(Random(-1)); // Генерация случайного целого zmq_send(fSocketRequester, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос Write('Iter: ', i, ' src=', fSrcVal); zmq_recv(fSocketRequester, @fResultVal, SizeOf(fResultVal), 0); // Ответ Writeln(' result=', fResultVal); end; zmq_close(fSocketRequester); zmq_ctx_destroy(fContext); Readln; end.
А вот - REP-сервер (сервис):
program BrRR_Service; {$APPTYPE CONSOLE} // Сервис (сервер) // Находится в известной клиентам конечной точке, связывает сокет REP с tcp:*:5560, // Получает целое число, возводит в квадрат и отправляет обратно результат uses SysUtils, ZMQ_h; var fContext: Pointer; fSocketResponder: Pointer; fSrcVal: Cardinal; fResultVal: UInt64; i: integer; begin fContext := zmq_ctx_new(); // Инициализация fSocketResponder := zmq_socket(fContext, ZMQ_REP); zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке Writeln('Starting service...'); i := 0; while True do begin zmq_recv(fSocketResponder, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос Write('Iter: ', i, ' src=', fSrcVal); fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа" zmq_send(fSocketResponder, @fResultVal, SizeOf(fResultVal), 0); // Ответ Writeln(' result=', fResultVal); Inc(i); end; zmq_close(fSocketResponder); zmq_ctx_destroy(fContext); Readln; end.
Пока ничего нового не видим: продублирована функциональность схемы "Вопрос - Ответ". Клиент - коннектится к конечной точке - к сервису, сервис находится в этой конечной точке и ждет запросов клиентов. "Конечная точка" - это известный адрес (Например, "tcp://localhost:5560"). То есть, реализована старая схема "Запрос-Ответ"/REQ-REP.
Будем улучшать мир. Воткнем между ними брокер.
Брокер.
Что-то новенькое: сокеты DIALER и ROUTER.
program BrRR_Broker; {$APPTYPE CONSOLE} // Брокер // Находится в известной клиентам и сервисам конечной точке, // Находится в известной клиентам конечной точке, // связывает сокет ZMQ_ROUTER с tcp:*:5559, // связывает сокет ZMQ_DEALER с tcp:*:5560, // Перекидывает сообщения между сокетами от FrontEnd к BackEnd // и обратно uses SysUtils, ZMQ_h, Math; var fContext: Pointer; fSocketFrontEnd: Pointer; fSocketBackEnd: Pointer; fZMQPoll: array[0..1] of zmq_pollitem_t; fMsg: zmq_msg_t; fDoMore: Boolean; begin fContext := zmq_ctx_new(); // Инициализация fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER); fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER); zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Кончная точка для сервисов fZMQPoll[0].socket := fSocketFrontEnd; // Инициализация пула сокетов fZMQPoll[0].fd := 0; fZMQPoll[0].events := ZMQ_POLLIN; fZMQPoll[0].revents := 0; fZMQPoll[1].socket := fSocketBackEnd; fZMQPoll[1].fd := 0; fZMQPoll[1].events := ZMQ_POLLIN; fZMQPoll[1].revents := 0; while true do begin zmq_poll(@fZMQPoll[0], Length(fZMQPoll), -1); // Проверка состояния сокетов из пула if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then while True do begin // Трансляция сообщний от клиента к сервису // Обработка всх частей сообщения zmq_msg_init(@fMsg); zmq_msg_recv(@fMsg, fSocketFrontEnd, 0); fDoMore := zmq_msg_more(@fMSG) <> 0; zmq_msg_send(@fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0)); zmq_msg_close(@fMsg); if not fDoMore then Break; // Это была последняя часть сообщения end; if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then while True do // Трансляция сообщний от сервиса к клиенту begin // Обработка всх частей сообщения zmq_msg_init(@fMsg); zmq_msg_recv(@fMsg, fSocketBackEnd, 0); fDoMore := zmq_msg_more(@fMSG) <> 0; zmq_msg_send(@fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0)); zmq_msg_close(@fMsg); if not fDoMore then Break; // Это была последняя часть сообщения end; end; zmq_close(fSocketFrontEnd); zmq_close(fSocketBackEnd); zmq_ctx_destroy(fContext); Readln; end.Не работает система. :(
Еще бы.
Чтобы все заработало, нужно у сервиса заменить биндинг на коннект:
// zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке zmq_connect (fSocketResponder, 'tcp://localhost:5560'); // Коннект к конкретной точке
Теперь все в порядке.
Брокер в асинхронном режиме слушает пул из сокетов с помощью zmq_poll(), затем читает (возможно, по частям) сообщение и транслирует его в выходной сокет. В обе стороны.
Чтение по частям реализовано "для общности". Чтобы работало с любыми сообщениями.
Интересно, что наши крошечные сообщения
zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос ... zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
и
zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос ... zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ
порождают на стороне брокера каскад из нескольких составных сообщений из структур zmq_msg_t, что хорошо видно в отладчике.
Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.
Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.
Жизнь налаживается. :)
Теперь топология сети выглядит так:
Приведенный выше код трансляции сообщений представляется очень полезным для многих случаев для схемы "Запрос - Ответ".
Так вот, ZeroMQ есть даже специальный метод, реализующий все, что мы нако'дли в брокере.
Важно!:
function zmq_proxy( frontend, backend, capture: Pointer ): Integer;
frontend - сокет фронтэнд
backend - сокет бэкэнд
capture - сокет для перехвата сообщений (nil, если не используется)
Технически нет никакой разницы между фронтэнд и бэкэнд сокетами.
Та-да-мммм!:
Правда, классно? :)

Важно.
Видов сокетов, которые практически можно использовать в брокере:
ROUTER - DEALER
XSUB - XPUB
PULL - PUSH.
Мосты.
Вопрос: "как передать сообщения из одной подсети в другую?"
Или даже - из сети с протоколом tcp в сеть pgm.
Вариант решения - с помощью моста.
В качестве моста используем только что рассмотренный прокси (брокер сообщений).
То есть, "мост" - это маленькое приложение, которое общается одним сокетом по одному протоколу, а другим - по другому.
Ну и преобразовывает сообщения в подходящий для протокола вид, если нужно.
В качестве примера напишем крошечный прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети.
Вернемся к примеру с метеостанцией.
Предположим, что сервер-издатель (которые измеряет температуру, давление и проч) работает во внутренней сети, часть подписчиков - тоже во внутренней. А еще часть - во внешней.
Создаем прокси, в которой фронтэнд сокет (SUB) будет общаться с внутренней сетью, а бэкэнд сокет (PUB) - с внешней.
Прокси будет подписываться фронтэнд-сокетом на события сервиса погоды и пере-публиковывать их на бэкэнд-сокете.
Прокси для Метео: "Мост В Интернет" :
program BrRR_BrokerMeteoInterNet; {$APPTYPE CONSOLE} // Брокер для проекта "Метео" // Реализует мост между интрасаетью метеосервера // и внешней сетью. Размещается во внутренней сети, // для брокера открыт "наружу" tcp порт 8100. // "Внешние" клиенты коннектятся к известной конечной точке tcp://10.1.1.0:8100 // Коннектит сокет ZMQ_XSUB с известной конечной точкой метеосервера tcp://192.168.55.210:5556 // Связывает сокет ZMQ_XPUB с tcp://10.1.1.0:8100, // Перекидывает сообщения подписки между сокетами от FrontEnd к BackEnd // и обратно uses SysUtils, ZMQ_h; var fContext: Pointer; fSocketFrontEnd: Pointer; fSocketBackEnd: Pointer; begin fContext := zmq_ctx_new(); // Инициализация fSocketFrontEnd := zmq_socket(fContext, ZMQ_XSUB); fSocketBackEnd := zmq_socket(fContext, ZMQ_XPUB); zmq_connect(fSocketFrontEnd, 'tcp://192.168.55.210:5556'); // Конечная точка метеосервиса zmq_bind(fSocketBackEnd, 'tcp://10.1.1.0:8100'); // Конечная точка для "внешних" подписчиков // zmq_bind(fSocketBackEnd, 'tcp://*:8100'); // Можно и так, главное - чтобы внешние клиенты знали адрес zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси zmq_close(fSocketFrontEnd); zmq_close(fSocketBackEnd); zmq_ctx_destroy(fContext); Readln; end.
Очень похоже на код предыдущего прокси, но используется для трансляции сообщений из одной подсети в другую. Точно так же подобный прокси - мост можно использовать, например, для подключения подписчиков в мультикаст PGM сети с издателем в TCP.
...
Теперь поговорим об обработке ошибок. (Продолжение).
Комментариев нет :
Отправить комментарий