Запрос/Ответ. Надежность на базе прокси с очередью (брокер с балансировкой нагрузки). Шаблон "Простой пират".
Шаблон "Простой Пират"
Расширим предыдущий шаблон обеспечения надежности типа "Ленивый пират", использовав прокси с очередями. Новый шаблон позволит общаться прозрачно с несколькими серверами, которые далее будем более точно называть "рабочими".
Во всех "пиратских" шаблонах рабочие не сохраняют свое состояние. Разработка системы обмена сообщениями подразумевает, что мы ничего не знаем о том, что приложениям могут понадобится какие-либо разделяемые ресурсы, хранящие состояние, вроде баз данных. Использование прокси с очередями подразумевает, что рабочие могут приходить и уходить, не зная ничего о клиентах. Если падает один из рабочих, другой заменяет его. Это - хорошая, простая топология с одним слабым местом: а именно брокером с очередью в центре, который может стать проблемой для системы управления и единой точкой отказа.
Топология сети:
Прокси с очередями построен на основе балансировщика нагрузки, рассмотренного ранее (тынц). Что в действительности нужно сделать, чтобы обработать падение или блокировку рабочего? Это удивительно, но нужно совсем немного. У нас уже есть механизм повторных запросов на клиенте. Поэтому шаблон с балансировкой нагрузки будет работать достаточно хорошо. Это соответствует философии ZeroMQ насчет того, что мы можем расширить шаблон "peer-to-peer" до шаблона типа "Запрос - Ответ" путем подключения между ними простейшего прокси.
Нам не нужен особый клиент, мы продолжим использовать клиента из шаблона "Ленивый пират". Вот код брокера с очередью, который, в общем, идентичен главной задаче из примера брокера с балансировкой нагрузки:
program Reliable_SimplePirate_Broker; {$APPTYPE CONSOLE} uses SysUtils, zmq_h, czmq_h, ZMQ_Utils, Math; // Брокер для шаблона Simple Pirate // Такой же, как и для схемы "брокер с балансировкой нагрузки", без механизмов // обеспечения надежности. Надежность зависит от восстановления клиентов. // Брокер должен работать постоянно. const c_WORKER_READY: Byte = 1; // Сигнал готовности рабочего procedure doMain(); var backend: Pointer; ctx: p_zctx_t; frame: p_zframe_t; frontend: Pointer; identity: p_zframe_t; workers: p_zlist_t; items: array[0..1] of zmq_pollitem_t; msg: p_zmsg_t; rc: Integer; begin ctx := zctx_new(); frontend := zsocket_new(ctx, ZMQ_ROUTER); backend := zsocket_new(ctx, ZMQ_ROUTER); zsocket_bind(frontend, 'tcp://*:5555'); // Для клиентов zsocket_bind(backend, 'tcp://*:5556'); // Для рабочих // Очередь доступных рабочих workers := zlist_new(); // Тело данного примера такое же, и знакомые брокеры с балансировкой нагрузки while true do begin zPollItemInit(items[0], backend, 0, ZMQ_POLLIN, 0); zPollItemInit(items[1], frontend, 0, ZMQ_POLLIN, 0); // Опрос фронтэнда тольк ов случае, когда есть доступные рабочие rc := zmq_poll(@items[0], IfThen(zlist_size(workers) > 0, 2, 1), -1); if rc = -1 then break; // Прерван // Обработка активности рабочего в бэкенде if (items[0].revents and ZMQ_POLLIN) <> 0 then begin // Для балансировки нагрузки используем идентификацию рабочего msg := zmsg_recv(backend); if msg = nil then break; // Прерван identity := zmsg_unwrap(msg); zlist_append(workers, identity); // Переправляем сообщение клиенту, если это не сигнал готовности с_READY frame := zmsg_first(msg); if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then zmsg_destroy(msg) else zmsg_send(msg, frontend); end; if (items[1].revents and ZMQ_POLLIN) <> 0 then begin // Получам запрос от клиента, отправляем его первому свободному рабочему msg := zmsg_recv(frontend); if msg <> nil then begin zmsg_wrap(msg, zlist_pop(workers)); zmsg_send(msg, backend); end end end; // При выходе выполняем очистку while zlist_size(workers) > 0 do begin frame := zlist_pop(workers); zframe_destroy(frame); end; zlist_destroy(workers); zctx_destroy(ctx); end; begin doMain(); Readln; end.
А это - код рабочего, в которого превратился сервер "Ленивого пирата", немного адаптированный под шаблон балансировщика нагрузки (с использованием сокета REQ и сигнала "ГОТОВ"):
program Reliable_SimplePirate_Worker; {$APPTYPE CONSOLE} uses SysUtils , zmq_h , czmq_h , ZMQ_Utils , Math ; // Рабочий для шаблона Simple Pirate // Подключается сокетом REQ к tcp://*:5556 // Реализация рабочего для случая балансировки нагрузки const c_WORKER_READY: Byte = 1; // Сигнал готовности рабочего procedure doMain(); var ctx: p_zctx_t; cycles: Integer; frame: p_zframe_t; worker: Pointer; identity: string; msg: p_zmsg_t; begin ctx := zctx_new(); worker := zsocket_new(ctx, ZMQ_REQ); // Для облегчения трассировки устанавливаем собственное произвольне значение Randomize(); identity := Format('%4.0x-%4.0x', [Random($10000), Random($10000)]); zmq_setsockopt(worker, ZMQ_IDENTITY, PChar(identity), Length(identity)); zsocket_connect(worker, 'tcp://localhost:5556'); // Сообщаем брокеру о готовности к работе z_Log(Format('I: (%s)worker ready', [identity])); frame := zframe_new(@c_WORKER_READY, 1); zframe_send(frame, worker, 0); cycles := 0; while True do begin msg := zmsg_recv(worker); if msg = nil then break; // Прерван // После нескольких циклов - имитация различных проблем Inc(cycles); if (cycles > 3) and (Random(5) = 0) then begin z_Log(Format('I: (%s) simulating a crash', [identity])); zmsg_destroy(msg); break; end else if (cycles > 3) and (Random(5) = 0) then begin z_Log(Format('I: (%s) simulating CPU overload', [identity])); sleep(2000); if (zctx_interrupted = 1) then break; // Прерван end; z_Log(Format('I: (%s)normal reply', [identity])); sleep(1000); // Имитация "тяжелой" работы zmsg_send(msg, worker); end; zctx_destroy(&ctx); end; begin doMain(); // Readln; end.
Для тестирования запускаем произвольное количество рабочих, клиентов "Ленивый пират" брокер, в произвольном порядке. Наблюдаем, как в итоге все рабочие падают, а клиенты пытаются достучаться до них. Брокер с очередью никогда не останавливается, и мы может запускать рабочих и клиентов в произвольном количестве.
Эта модель работает с любым числом клиентов и рабочих.
Дальше мы рассмотрим супер-надежную схему с хартбитингом (шаблон "Пират - паранок"). (Продолжение)
Комментариев нет :
Отправить комментарий