Запрос/Ответ. Надежность на базе прокси с очередью (брокер с балансировкой нагрузки). Шаблон "Простой пират".
Шаблон "Простой Пират"
Расширим предыдущий шаблон обеспечения надежности типа "Ленивый пират", использовав прокси с очередями. Новый шаблон позволит общаться прозрачно с несколькими серверами, которые далее будем более точно называть "рабочими".
Во всех "пиратских" шаблонах рабочие не сохраняют свое состояние. Разработка системы обмена сообщениями подразумевает, что мы ничего не знаем о том, что приложениям могут понадобится какие-либо разделяемые ресурсы, хранящие состояние, вроде баз данных. Использование прокси с очередями подразумевает, что рабочие могут приходить и уходить, не зная ничего о клиентах. Если падает один из рабочих, другой заменяет его. Это - хорошая, простая топология с одним слабым местом: а именно брокером с очередью в центре, который может стать проблемой для системы управления и единой точкой отказа.
Топология сети:
Прокси с очередями построен на основе балансировщика нагрузки, рассмотренного ранее (тынц). Что в действительности нужно сделать, чтобы обработать падение или блокировку рабочего? Это удивительно, но нужно совсем немного. У нас уже есть механизм повторных запросов на клиенте. Поэтому шаблон с балансировкой нагрузки будет работать достаточно хорошо. Это соответствует философии ZeroMQ насчет того, что мы можем расширить шаблон "peer-to-peer" до шаблона типа "Запрос - Ответ" путем подключения между ними простейшего прокси.
Нам не нужен особый клиент, мы продолжим использовать клиента из шаблона "Ленивый пират". Вот код брокера с очередью, который, в общем, идентичен главной задаче из примера брокера с балансировкой нагрузки:
program Reliable_SimplePirate_Broker;
{$APPTYPE CONSOLE}
uses
SysUtils,
zmq_h,
czmq_h,
ZMQ_Utils,
Math;
// Брокер для шаблона Simple Pirate
// Такой же, как и для схемы "брокер с балансировкой нагрузки", без механизмов
// обеспечения надежности. Надежность зависит от восстановления клиентов.
// Брокер должен работать постоянно.
const
c_WORKER_READY: Byte = 1; // Сигнал готовности рабочего
procedure doMain();
var
backend: Pointer;
ctx: p_zctx_t;
frame: p_zframe_t;
frontend: Pointer;
identity: p_zframe_t;
workers: p_zlist_t;
items: array[0..1] of zmq_pollitem_t;
msg: p_zmsg_t;
rc: Integer;
begin
ctx := zctx_new();
frontend := zsocket_new(ctx, ZMQ_ROUTER);
backend := zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(frontend, 'tcp://*:5555'); // Для клиентов
zsocket_bind(backend, 'tcp://*:5556'); // Для рабочих
// Очередь доступных рабочих
workers := zlist_new();
// Тело данного примера такое же, и знакомые брокеры с балансировкой нагрузки
while true do begin
zPollItemInit(items[0], backend, 0, ZMQ_POLLIN, 0);
zPollItemInit(items[1], frontend, 0, ZMQ_POLLIN, 0);
// Опрос фронтэнда тольк ов случае, когда есть доступные рабочие
rc := zmq_poll(@items[0], IfThen(zlist_size(workers) > 0, 2, 1), -1);
if rc = -1 then
break; // Прерван
// Обработка активности рабочего в бэкенде
if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
// Для балансировки нагрузки используем идентификацию рабочего
msg := zmsg_recv(backend);
if msg = nil then
break; // Прерван
identity := zmsg_unwrap(msg);
zlist_append(workers, identity);
// Переправляем сообщение клиенту, если это не сигнал готовности с_READY
frame := zmsg_first(msg);
if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
zmsg_destroy(msg)
else
zmsg_send(msg, frontend);
end;
if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получам запрос от клиента, отправляем его первому свободному рабочему
msg := zmsg_recv(frontend);
if msg <> nil then begin
zmsg_wrap(msg, zlist_pop(workers));
zmsg_send(msg, backend);
end
end
end;
// При выходе выполняем очистку
while zlist_size(workers) > 0 do begin
frame := zlist_pop(workers);
zframe_destroy(frame);
end;
zlist_destroy(workers);
zctx_destroy(ctx);
end;
begin
doMain();
Readln;
end.
А это - код рабочего, в которого превратился сервер "Ленивого пирата", немного адаптированный под шаблон балансировщика нагрузки (с использованием сокета REQ и сигнала "ГОТОВ"):
program Reliable_SimplePirate_Worker;
{$APPTYPE CONSOLE}
uses
SysUtils
, zmq_h
, czmq_h
, ZMQ_Utils
, Math
;
// Рабочий для шаблона Simple Pirate
// Подключается сокетом REQ к tcp://*:5556
// Реализация рабочего для случая балансировки нагрузки
const
c_WORKER_READY: Byte = 1; // Сигнал готовности рабочего
procedure doMain();
var
ctx: p_zctx_t;
cycles: Integer;
frame: p_zframe_t;
worker: Pointer;
identity: string;
msg: p_zmsg_t;
begin
ctx := zctx_new();
worker := zsocket_new(ctx, ZMQ_REQ);
// Для облегчения трассировки устанавливаем собственное произвольне значение
Randomize();
identity := Format('%4.0x-%4.0x', [Random($10000), Random($10000)]);
zmq_setsockopt(worker, ZMQ_IDENTITY, PChar(identity), Length(identity));
zsocket_connect(worker, 'tcp://localhost:5556');
// Сообщаем брокеру о готовности к работе
z_Log(Format('I: (%s)worker ready', [identity]));
frame := zframe_new(@c_WORKER_READY, 1);
zframe_send(frame, worker, 0);
cycles := 0;
while True do begin
msg := zmsg_recv(worker);
if msg = nil then
break; // Прерван
// После нескольких циклов - имитация различных проблем
Inc(cycles);
if (cycles > 3) and (Random(5) = 0) then begin
z_Log(Format('I: (%s) simulating a crash', [identity]));
zmsg_destroy(msg);
break;
end
else
if (cycles > 3) and (Random(5) = 0) then begin
z_Log(Format('I: (%s) simulating CPU overload', [identity]));
sleep(2000);
if (zctx_interrupted = 1) then
break; // Прерван
end;
z_Log(Format('I: (%s)normal reply', [identity]));
sleep(1000); // Имитация "тяжелой" работы
zmsg_send(msg, worker);
end;
zctx_destroy(&ctx);
end;
begin
doMain();
// Readln;
end.
Для тестирования запускаем произвольное количество рабочих, клиентов "Ленивый пират" брокер, в произвольном порядке. Наблюдаем, как в итоге все рабочие падают, а клиенты пытаются достучаться до них. Брокер с очередью никогда не останавливается, и мы может запускать рабочих и клиентов в произвольном количестве.
Эта модель работает с любым числом клиентов и рабочих.
Дальше мы рассмотрим супер-надежную схему с хартбитингом (шаблон "Пират - паранок"). (Продолжение)

Комментариев нет :
Отправить комментарий