(Начало здесь)
Проблема динамического обнаружения
Вопрос сродни вот этому: Как получить список доступных MS SQL серверов?
Проблема динамического обнаружения
Вопрос сродни вот этому: Как получить список доступных MS SQL серверов?
Одной из проблем, возникающих при проектировании крупных распределенных систем является обнаружение сервисов. Частный случай проблемы - "как клиент узнает, к какому серверу нужно коннектиться?" А в общем случае - "как элементам сети найти друг друга?"
Особенно трудно в случае, когда разные элементы подключаются и отключаются, из разных узлов сети. Поэтому это называется "проблемой динамического обнаружения".
Эта проблема решается везде по-разному: используется служба DNS, или широковещательные сообщения (UDP) и т.п.
Есть несколько простых решений проблемы динамического обнаружения. Можно жестко закодировать адрес(ip + порт) (т,е., "конечную точку"). Вообще никаких проблем - и никакой гибкости. Впрочем, некоторой гибкости для tcp транспорта можно добавить с помощью службы DNS.
Можно конечную току задавать с помощью конфигурационных файлов (и т.д.). Главное - не забывать их вовремя обновлять .
Можно использовать специальный брокер адресации, который передаст вам нужные данные. Только вот адрес этого брокера должен быть известен...
Можно построить средство, исследующее окружение, сканирующее известные диапазоны адресов и порты. Или рассылающее (получающее) udp - пакеты об адресной информации.
...
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Можно при запуске нового элемента сети вручную задавать конечную точку ("адрес сервера"). То есть, при подключении вы просто знаете конфигурацию сети и сообщаете нужные данные новому элементу сети. Так часто делают, но в реальности это приводит к громоздким и хрупким топологиям.
Вспомним пример "Издатель-Подписчик".
Пусть есть один издатель и тысяча подписчиков. Каждый подписчик коннектится к одному издателю. Можно руками конфигурировать подключение каждого подписчика, это просто. Сделали, работает.
Дальше. Подписчик - динамичный (может подключаться из любого места, и включаться когда угодно), а издатель - статичный.
Сделали, работает.
Нужно динамическое обнаружение.
Добавим брокер сообщений. ZMQ не поставляется с брокерами, но позволяет его легко построить.
Теперь Издатель-Подписчик схема будет выглядеть вот так:
Таким образом, со статичным борокером сообщений задача решена.
Можно даже запустить общий брокер для всех типов сообщений, и строить сетевую архитектуру вокруг него.
Т.е., топология "Звезда" работает. Какое-то время.
...пока не возникнут вопросы вроде пропускной способности и расширения/усложнения логики брокера.
АДЪ неминуем.
Решение.
Куда лучше реализовать брокер не в качестве транспорта сообщений, а в качестве поставщика адресной информации.
Для этого следует использовать сокеты ZMQ типа XPUB и XSUB, так как с ними ZMQ не пересылает сообщения от издателя к подписчику напрямую.
Сокеты XSUB и XPUB - точно такие же, как и сокеты типа SUB и PUB, за исключением того, что они обрабатывают подписки в форме специальных сообщений. А при подключении SUB и PUB сокетов к XPUB и XSUB сокетам первые связываются друг с другом уже по известным адресам.
То есть, основной поток данных идет, минуя брокер:
PS: с сокетами XPUB/XSUB я пока не работал. Возможно, все не так, как здесь описано, так как я просто плохо понял документацию на английском. Но я обезательно разберусь и отпишусь здесь.
....
....
....
Шаблон "Разделяемая очередь".
Потихоньку перейдем к сокетам DEALER и ROUTER.
...
В реальности может потребоваться, чтобы множество клиентов могли подключаться к разным сервисам (например, для распределения нагрузки по сервисам).
Для реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.
Например, коннектим клинтский сокет к трем конечным точкам ("сервисам") :A, B, и C.
Вспомним этот пример, поясняющий схему "Запрос - Ответ" (REQ - REP). Чтобы этот клиент подключился к трем сервисам (например, по tpc на localhost, на трех разных портах), сокет следует приконнектить ко всем трем конечным точкам:
zmq_connect(requester, 'tcp://localhost:5555'); zmq_connect(requester, 'tcp://localhost:5556'); zmq_connect(requester, 'tcp://localhost:5557');
Клиент последовательно выполняет запросы R1, R2, R3, R4. В результате запросы R1 и R4 отправляются к сервису A, R2 - к B, R3 - к C. Циклически распределяя запросы (наш любимый roud-robin). Такая конструкция позволяет добавлять без проблем добавлять сколько угодно клиентов.
Как показано выше, с помощью zmq_connect() можно добавлять сколько угодно сервисов. Беда в том, что клиент должен знать, где находится новый сервис. Если клиентов - 100, и в течении скажем, суток, добавляется всего три новых сервиса, то нужно в итоге триста раз переконфигурировать всех клиентов.
Что грустно.
В идеале, мы должны быть в состоянии добавлять и удалять сервисы или клиентов в любое время, не касаясь любой другой части топологии.
...
Для реализации коннектов "много:много" есть два пути:
- Каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.
- Второй путь - использование брокера запросов как промежуточного слоя.
Напишем крошечный брокер запросов, реализующий желаемую гибкость топологии.
Брокер соединит две конечные точки - фронтенд (сокет стороны клиентов) и бэкэнд (сокет стороны сервисов).
Затем брокер, используя zmq_poll(), будет отслеживать активность этих сокетов, и перебрасывать сообщения от одного сокета к другому. При чем, в ручном управлении очередности использования сервисов нет необходимости, т.к. ZeroMQ делает это автоматически для каждого сокета.
Когда мы построили приложение по схеме "Запрос - Ответ (Req_Rep)", система получилась с синхронным диалогом обмена. Клиент шлет запрос. Сервис читает запрос и шлет ответ. Клиент читает ответ. Если клиент или сервис будут выполнять что-то другое (например, клиент пошлет два запроса подряд без ожидания ответа), система просто перестанет работать.
...
Конечно, раз теперь мы умеем пользоваться zmq_poll(), мы можем сделать брокер неблокирующим.
Но мы пойдем другим путем, и вообще не станем использовать сокеты типа REP и REQ.
Так вот, есть схема использования пар сокетов, которые реализуют схему "Посредник - Маршрутизатор"; режимы сокетов соответственно называются DEALER и ROUTER. Они позволяют получить неблокирующий режим для схемы "Запрос - Ответ".
В нашей схеме "Запрос - Ответ" сокет REQ будет "говорить" с сокетом ROUTER, а сокет DEALER - с сокетом REP.
Сокеты DEALER и ROUTER будет как раз размещаться на нашем брокере, а передачу сообщений между ними мы обеспечим с помощью кода. Будем извлекать сообщение из одного сокета и передавать его другому сокету.
Наш брокер схемы "Запрос - Ответ" привязывается к двум конечным точкам: одна для коннектов к ней клиентов(фронтэнд сокет), вторая - для коннектов сервисов(бэкэнд сокет).
Задача та же, что и в первом примере: возведение в квадрат целых чисел. Клиент отсылает запросы, сервис (сервер) - выполняет. Запрос - беззнаковое x32 целое число (UInt32) , ответ - квадрат беззнаковое x64 целого (UInt64).
Вот наш немудрёный REQ - клиент :
program BrRR_Client; {$APPTYPE CONSOLE} // Клиент // Коннектится сокетом REQ к tcp://localhost:5559 // Шлет целое число сервису (серверу), обратно получает квадрат числа uses SysUtils, ZMQ_h; const c_ReqCnt = 100; var fContext: Pointer; fSocketRequester: Pointer; fSrcVal: Cardinal; fResultVal: UInt64; i: integer; begin fContext := zmq_ctx_new(); // Инициализация fSocketRequester := zmq_socket(fContext, ZMQ_REQ); zmq_connect(fSocketRequester, 'tcp://localhost:5559'); // Коннект к сервису Randomize(); for i := 0 to Pred(c_ReqCnt) do begin fSrcVal := Cardinal(Random(-1)); // Генерация случайного целого zmq_send(fSocketRequester, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос Write('Iter: ', i, ' src=', fSrcVal); zmq_recv(fSocketRequester, @fResultVal, SizeOf(fResultVal), 0); // Ответ Writeln(' result=', fResultVal); end; zmq_close(fSocketRequester); zmq_ctx_destroy(fContext); Readln; end.
А вот - REP-сервер (сервис):
program BrRR_Service; {$APPTYPE CONSOLE} // Сервис (сервер) // Находится в известной клиентам конечной точке, связывает сокет REP с tcp:*:5560, // Получает целое число, возводит в квадрат и отправляет обратно результат uses SysUtils, ZMQ_h; var fContext: Pointer; fSocketResponder: Pointer; fSrcVal: Cardinal; fResultVal: UInt64; i: integer; begin fContext := zmq_ctx_new(); // Инициализация fSocketResponder := zmq_socket(fContext, ZMQ_REP); zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке Writeln('Starting service...'); i := 0; while True do begin zmq_recv(fSocketResponder, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос Write('Iter: ', i, ' src=', fSrcVal); fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа" zmq_send(fSocketResponder, @fResultVal, SizeOf(fResultVal), 0); // Ответ Writeln(' result=', fResultVal); Inc(i); end; zmq_close(fSocketResponder); zmq_ctx_destroy(fContext); Readln; end.
Пока ничего нового не видим: продублирована функциональность схемы "Вопрос - Ответ". Клиент - коннектится к конечной точке - к сервису, сервис находится в этой конечной точке и ждет запросов клиентов. "Конечная точка" - это известный адрес (Например, "tcp://localhost:5560"). То есть, реализована старая схема "Запрос-Ответ"/REQ-REP.
Будем улучшать мир. Воткнем между ними брокер.
Брокер.
Что-то новенькое: сокеты DIALER и ROUTER.
program BrRR_Broker; {$APPTYPE CONSOLE} // Брокер // Находится в известной клиентам и сервисам конечной точке, // Находится в известной клиентам конечной точке, // связывает сокет ZMQ_ROUTER с tcp:*:5559, // связывает сокет ZMQ_DEALER с tcp:*:5560, // Перекидывает сообщения между сокетами от FrontEnd к BackEnd // и обратно uses SysUtils, ZMQ_h, Math; var fContext: Pointer; fSocketFrontEnd: Pointer; fSocketBackEnd: Pointer; fZMQPoll: array[0..1] of zmq_pollitem_t; fMsg: zmq_msg_t; fDoMore: Boolean; begin fContext := zmq_ctx_new(); // Инициализация fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER); fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER); zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Кончная точка для сервисов fZMQPoll[0].socket := fSocketFrontEnd; // Инициализация пула сокетов fZMQPoll[0].fd := 0; fZMQPoll[0].events := ZMQ_POLLIN; fZMQPoll[0].revents := 0; fZMQPoll[1].socket := fSocketBackEnd; fZMQPoll[1].fd := 0; fZMQPoll[1].events := ZMQ_POLLIN; fZMQPoll[1].revents := 0; while true do begin zmq_poll(@fZMQPoll[0], Length(fZMQPoll), -1); // Проверка состояния сокетов из пула if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then while True do begin // Трансляция сообщний от клиента к сервису // Обработка всх частей сообщения zmq_msg_init(@fMsg); zmq_msg_recv(@fMsg, fSocketFrontEnd, 0); fDoMore := zmq_msg_more(@fMSG) <> 0; zmq_msg_send(@fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0)); zmq_msg_close(@fMsg); if not fDoMore then Break; // Это была последняя часть сообщения end; if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then while True do // Трансляция сообщний от сервиса к клиенту begin // Обработка всх частей сообщения zmq_msg_init(@fMsg); zmq_msg_recv(@fMsg, fSocketBackEnd, 0); fDoMore := zmq_msg_more(@fMSG) <> 0; zmq_msg_send(@fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0)); zmq_msg_close(@fMsg); if not fDoMore then Break; // Это была последняя часть сообщения end; end; zmq_close(fSocketFrontEnd); zmq_close(fSocketBackEnd); zmq_ctx_destroy(fContext); Readln; end.Не работает система. :(
Еще бы.
Чтобы все заработало, нужно у сервиса заменить биндинг на коннект:
// zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке zmq_connect (fSocketResponder, 'tcp://localhost:5560'); // Коннект к конкретной точке
Теперь все в порядке.
Брокер в асинхронном режиме слушает пул из сокетов с помощью zmq_poll(), затем читает (возможно, по частям) сообщение и транслирует его в выходной сокет. В обе стороны.
Чтение по частям реализовано "для общности". Чтобы работало с любыми сообщениями.
Интересно, что наши крошечные сообщения
zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос ... zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
и
zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос ... zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ
порождают на стороне брокера каскад из нескольких составных сообщений из структур zmq_msg_t, что хорошо видно в отладчике.
Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.
Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.
Жизнь налаживается. :)
Теперь топология сети выглядит так:
Приведенный выше код трансляции сообщений представляется очень полезным для многих случаев для схемы "Запрос - Ответ".
Так вот, ZeroMQ есть даже специальный метод, реализующий все, что мы нако'дли в брокере.
Важно!:
function zmq_proxy( frontend, backend, capture: Pointer ): Integer;
frontend - сокет фронтэнд
backend - сокет бэкэнд
capture - сокет для перехвата сообщений (nil, если не используется)
Технически нет никакой разницы между фронтэнд и бэкэнд сокетами.
Та-да-мммм!:
Правда, классно? :)
Важно.
Видов сокетов, которые практически можно использовать в брокере:
ROUTER - DEALER
XSUB - XPUB
PULL - PUSH.
Мосты.
Вопрос: "как передать сообщения из одной подсети в другую?"
Или даже - из сети с протоколом tcp в сеть pgm.
Вариант решения - с помощью моста.
В качестве моста используем только что рассмотренный прокси (брокер сообщений).
То есть, "мост" - это маленькое приложение, которое общается одним сокетом по одному протоколу, а другим - по другому.
Ну и преобразовывает сообщения в подходящий для протокола вид, если нужно.
В качестве примера напишем крошечный прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети.
Вернемся к примеру с метеостанцией.
Предположим, что сервер-издатель (которые измеряет температуру, давление и проч) работает во внутренней сети, часть подписчиков - тоже во внутренней. А еще часть - во внешней.
Создаем прокси, в которой фронтэнд сокет (SUB) будет общаться с внутренней сетью, а бэкэнд сокет (PUB) - с внешней.
Прокси будет подписываться фронтэнд-сокетом на события сервиса погоды и пере-публиковывать их на бэкэнд-сокете.
Прокси для Метео: "Мост В Интернет" :
program BrRR_BrokerMeteoInterNet; {$APPTYPE CONSOLE} // Брокер для проекта "Метео" // Реализует мост между интрасаетью метеосервера // и внешней сетью. Размещается во внутренней сети, // для брокера открыт "наружу" tcp порт 8100. // "Внешние" клиенты коннектятся к известной конечной точке tcp://10.1.1.0:8100 // Коннектит сокет ZMQ_XSUB с известной конечной точкой метеосервера tcp://192.168.55.210:5556 // Связывает сокет ZMQ_XPUB с tcp://10.1.1.0:8100, // Перекидывает сообщения подписки между сокетами от FrontEnd к BackEnd // и обратно uses SysUtils, ZMQ_h; var fContext: Pointer; fSocketFrontEnd: Pointer; fSocketBackEnd: Pointer; begin fContext := zmq_ctx_new(); // Инициализация fSocketFrontEnd := zmq_socket(fContext, ZMQ_XSUB); fSocketBackEnd := zmq_socket(fContext, ZMQ_XPUB); zmq_connect(fSocketFrontEnd, 'tcp://192.168.55.210:5556'); // Конечная точка метеосервиса zmq_bind(fSocketBackEnd, 'tcp://10.1.1.0:8100'); // Конечная точка для "внешних" подписчиков // zmq_bind(fSocketBackEnd, 'tcp://*:8100'); // Можно и так, главное - чтобы внешние клиенты знали адрес zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси zmq_close(fSocketFrontEnd); zmq_close(fSocketBackEnd); zmq_ctx_destroy(fContext); Readln; end.
Очень похоже на код предыдущего прокси, но используется для трансляции сообщений из одной подсети в другую. Точно так же подобный прокси - мост можно использовать, например, для подключения подписчиков в мультикаст PGM сети с издателем в TCP.
...
Теперь поговорим об обработке ошибок. (Продолжение).
Комментариев нет :
Отправить комментарий