Запрос/Ответ. Высоконадежный брокер с очередью и хартбитингом (шаблон "Пират - параноик").
Шаблон "Пират - параноик"
Шаблон "Простой пират" достаточно хорош особенно тем, что он является простой комбинацией двух уже существующих шаблонов. Тем не менее, и у него тоже есть некоторые недостатки:
- Он перестает работать в случае, когда очередь (брокер) падает и перезапускается. Клиент восстановится, а рабочие - нет. Хотя ZeroMQ и выполнит автоматический реконнект после перезапуска очереди, рабочие не пошлют сигнала "ГОТОВ" и, следовательно, не будут считаться доступными. Для исправления реализуем хартбитинг от очереди к рабочим так, чтобы рабочий смог определить, что очередь стала недоступной.
- Очередь не обнаруживает отказов рабочих, поэтому, если рабочие падают во время простоя, брокер не может удалить таких рабочих их очереди доступных рабочих до тех пор, пока брокер не не пошлет такому рабочему запрос. Клиент будет ждать и делать перезапросы в никуда. Это не очень большая проблема, но это неприятно. Чтобы все работало правильно, реализуем хартбитинг от рабочего к очереди так, чтобы очередь могла определять потерянных рабочих на любом этапе.
Топология сети для схемы "Пират - параноик":
Ранее для рабочего мы использовали сокет REQ. Для реализации шаблона "Пират - параноик" мы перейдем на сокет DEALER. Это даст возможность отправлять и принимать сообщения в любые моменты времени, а не просто в последовательности "запрос/ответ", как того требует жесткая архитектура сокета REQ. Недостатком сокета DEALER является необходимость ручного управления построением структуры конверта сообщения. (См. Схема "Издатель - Подписчик", подробности, структура конверта).
Мы продолжаем использовать клиента из шаблона "Ленивый пират".
Код очереди (брокера) для шаблона "Пират - параноик":
program Reliable_ParanoidPirate_Broker;
{$APPTYPE CONSOLE}
uses
SysUtils
, zmq_h
, czmq_h
, zmq_utils
, Math
;
// Очередь (брокер) для шаблона "Пират - параноик"
const
c_HEARTBEAT_LIVENESS = 3; // Обычно 3-5
c_HEARTBEAT_INTERVAL = 1000; // msecs
// Константы для протокола "Пират- параноик"
c_PPP_READY: Byte = 1; // Сигнал готовности рабочего
c_PPP_HEARTBEAT: Byte = 2; // Сигнал хартбитинга рабочего
type
TWorker = class
// Класс, описывающий рабочего.
public
identity: p_zframe_t; // Идентификатор рабочего
id_string: PChar; // Строковое представление идентификатора
expiry: int64_t; // Момент смерти
procedure Ready(workers: p_zlist_t);
constructor Create(a_identity: p_zframe_t);
destructor Destroy; override;
end;
// Метод s_workers_next возвращает идентификатор следующего доступного рабочего:
function
s_workers_next(workers: p_zlist_t): p_zframe_t;
var
worker: TWorker;
begin
worker := zlist_pop(workers);
assert(worker <> nil);
Result := worker.identity;
worker.identity := nil;
worker.Free;
end;
procedure s_workers_purge(workers: p_zlist_t);
// Метод s_workers_purge находит и уничтожает все объекты мертвых рабочих.
// Обрабатка идет от старейших к недавно использованным. Таким образом, мы
// останавливаемся на первом доступном рабочем:
var
worker: TWorker;
begin
worker := zlist_first(workers);
while worker <> nil do begin
if zclock_time() < worker.expiry then
break; // Рабочий жив, используем
zlist_remove(workers, worker);
worker.Free;
worker := zlist_first(workers);
end;
end;
procedure doMain();
// Основаня задача представляет собой балансировщик нагрузки с
// харбитингом к рабочим. Таким образом, мы можем обнаружить упавшие
// или зависшие задачи рабочих:
var
backend: Pointer;
ctx: p_zctx_t;
frame: p_zframe_t;
frontend: Pointer;
heartbeat_at: uint64_t;
identity: p_zframe_t;
items: array[0..1] of zmq_pollitem_t;
msg: p_zmsg_t;
rc: Integer;
worker: TWorker;
workers: p_zlist_t;
begin
ctx := zctx_new();
frontend := zsocket_new(ctx, ZMQ_ROUTER);
backend := zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(frontend, 'tcp://*:5555'); // Для клиентов
zsocket_bind(backend, 'tcp://*:5556'); // Для рабочих
// Список доступных рабочих
workers := zlist_new();
// Посылаем хартбиты через одинаковые интервалы времени
heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;
while True do begin
zPollItemInit(items[0], backend, 0, ZMQ_POLLIN, 0);
zPollItemInit(items[1], frontend, 0, ZMQ_POLLIN, 0);
// Опрашиваем фронтенд только тогда, когда есть доступные рабочие
rc := zmq_poll(@items[0], IfThen(zlist_size(workers) > 0, 2, 1),
c_HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if rc = -1 then
break; // Прерван
// Обработка активности рабочена на бэкэнде
if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
// Используем идентификацию рабочего для балансировщика нагрузки
msg := zmsg_recv(backend);
if msg = nil then
break; // Прерван
// Любое проявление активноси рабочего подразумевает его готовность
identity := zmsg_unwrap(msg);
worker := TWorker.Create(identity);
worker.Ready(workers);
// Валидация управляющего сообщения, или возврат ответа клиенту
if zmsg_size(msg) = 1 then begin
frame := zmsg_first(msg);
if not (CompareMem(zframe_data(frame), @c_PPP_READY, 1)
or CompareMem(zframe_data(frame), @c_PPP_HEARTBEAT, 1)) then begin
z_Log('E: invalid message from worker');
zmsg_print(msg);
end;
zmsg_destroy(msg);
end
else
zmsg_send(msg, frontend);
end;
if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получаем следующий запрос клиента и распределяем его следующему рабочему
msg := zmsg_recv(frontend);
if msg = nil then
break; // Прерван
zmsg_push(msg, s_workers_next(workers));
zmsg_send(msg, backend);
end;
// Обрабатываем хартбитинг после любой активности сокета. Сначала отправляем
// хартбиты всем простаивающим в данный момент времени рабочим. Затем очищаемся
// от мертвых рабочих:
if (zclock_time() >= heartbeat_at) then begin
worker := zlist_first(workers);
while worker <> nil do begin
// Строим пакет сообщения. ZFRAME_REUSE - без разрушения фрейма
zframe_send(worker.identity, backend, c_ZFRAME_REUSE + c_ZFRAME_MORE);
frame := zframe_new(@c_PPP_HEARTBEAT, 1);
zframe_send(frame, backend, 0);
worker := zlist_next(workers);
end;
// Следующий хартбит
heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;
end;
s_workers_purge(workers); // Сбрасываем мертвых рабочих
end; // while
// Завершение работы, очистка.
while zlist_size(workers) > 0 do begin
worker := zlist_pop(workers);
worker.Free;
end;
zlist_destroy(workers);
zctx_destroy(ctx);
end;
constructor TWorker.Create(a_identity: p_zframe_t);
begin
// Создание объекта рабочего
identity := a_identity;
id_string := zframe_strhex(identity);
expiry := zclock_time()
+ c_HEARTBEAT_INTERVAL * c_HEARTBEAT_LIVENESS;
end;
destructor TWorker.Destroy;
begin
// Разрушение объекта рабочего, в том числе идентифицирущего фрейма
// и строкового идентификатора.
// Разрушаем с помощью методов czmq, так как эти объекты создавались
// в недельфийском диспетчере памяти
zframe_destroy(identity);
zstr_free(id_string);
inherited;
end;
procedure TWorker.Ready(workers: p_zlist_t);
var
worker: TWorker;
begin
// Метод ready помещает рабочего в конец списка готовых рабочих:
worker := zlist_first(workers);
while worker <> nil do begin
if string(id_string) = string(worker.id_string) then begin
zlist_remove(workers, worker);
worker.Free;
break;
end;
worker := zlist_next(workers);
end;
zlist_append(workers, self);
end;
begin
IsMultiThread := True;
doMain();
readln;
end.
Очередь представляет собой усложненный шаблон балансировки нагрузки с хартбитингом рабочих. Хартбитинг - не самая простая для понимания вещь. Однако, все станет совершенно ясно через пару секунд.
Код рабочего для шаблона "Пират - параноик":
program Reliable_ParanoidPirate_Worker;
{$APPTYPE CONSOLE}
uses
SysUtils
, zmq_h
, czmq_h
, zmq_utils
, Math
;
// Рабочий для шаблона "Пират - параноик"
const
c_HEARTBEAT_LIVENESS = 3; // Нормально, если 3-5
c_HEARTBEAT_INTERVAL = 1000; // msecs
c_INTERVAL_INIT = 1000; // Начальный интервал реконнекта
c_INTERVAL_MAX = 32000; // После экпоненциальной отсрочки
// Константы протокола "Пират - параноик"
c_PPP_READY: Byte = 1; // Сигнал готовности рабочего
c_PPP_HEARTBEAT: Byte = 2; // Сигнал хартбитинга рабочего
// Вспомогательная функция, возвращающая заново настроенный сокет,
// подключенный к очереди шаблоона "Параллельный пират"
function
s_worker_socket(ctx: p_zctx_t): pointer;
var
frame: p_zframe_t;
begin
Result := zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(Result, 'tcp://localhost:5556');
// Сообщаем очереди о готовности к работе
z_Log('I: worker ready');
frame := zframe_new(@c_PPP_READY, 1);
zframe_send(frame, Result, 0);
end;
procedure doMain;
var
ctx: p_zctx_t;
cycles: Integer;
frame: p_zframe_t;
heartbeat_at: uint64_t;
interval: size_t;
items: zmq_pollitem_t;
liveness: size_t;
msg: p_zmsg_t;
rc: Integer;
worker: pointer;
begin
// Протокол "Пират - параноик" (Paranoid Pirate Protocol - PPP) со стороны
// рабочего выполнен в виде одной задачи. В данном случае особенно интересна
// реализация хартбитинга, которая позволяет рабочему определять доступность
// очереди (и наоборот)
ctx := zctx_new();
worker := s_worker_socket(ctx);
// Если индикатор живучести liveness обнуляется, считаем, что связь
// с очередью прервалась
liveness := c_HEARTBEAT_LIVENESS;
interval := c_INTERVAL_INIT;
// Отсылает хартбиты через равные промежутки времени
heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;
Randomize();
cycles := 0;
while true do begin
zPollItemInit(items, worker, 0, ZMQ_POLLIN, 0);
rc := zmq_poll(@items, 1, c_HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if rc = -1 then
break; // Прерван
if (items.revents and ZMQ_POLLIN) <> 0 then begin
// Получаем сообщение. Если оно состоит из
// - 3х частей (конверт + содержимое) -> это запрос
// - 1й части (хартбит) -> это хартбит
msg := zmsg_recv(worker);
if msg = nil then
break; // Прерван
// Тестируем живучесть реализации очереди. Для этого имитируем различные
// проблемы: падение рабочего или подвисание рабочего.
// Делаем это после выполнения нескольких циклов таки образом, чтобы система
// изначально смогла запуститься:
if zmsg_size(msg) = 3 then begin
Inc(cycles);
if (cycles > 3) and (Random(5) = 0) then begin
z_Log('I: simulating a crash');
zmsg_destroy(msg);
break;
end
else
if (cycles > 3) and (Random(5) = 0) then begin
z_Log('I: simulating CPU overload');
sleep(3000);
if zctx_interrupted = 1 then
break;
end;
z_Log('I: normal reply');
zmsg_send(msg, worker);
liveness := c_HEARTBEAT_LIVENESS;
sleep(1000); // Do some heavy work
if zctx_interrupted = 1 then
break;
end
else
// При получения от очереди сообщения харбитинга считаем, что очередь
// (была) жива, поэтому сбрасываем индикатор живучести liveness:
if zmsg_size(msg) = 1 then begin
frame := zmsg_first(msg);
if CompareMem(zframe_data(frame), @c_PPP_HEARTBEAT, 1) then
liveness := c_HEARTBEAT_LIVENESS
else begin
z_Log('E: invalid message');
zmsg_print(msg);
end;
zmsg_destroy(&msg);
end
else begin
Z_log('E: invalid message');
zmsg_print(msg);
end;
interval := c_INTERVAL_INIT;
end
else begin
// Если очередь так и не прислала хартбитинг, уничтожаем сокет и реконнектимся.
// Вот такой простой и суровый способ отбросить все отправленные сообщения:
Dec(liveness);
if liveness = 0 then begin
Z_log('W: heartbeat failure, can''t reach queue');
Z_log(Format('W: reconnecting in %ud msec...', [interval]));
zclock_sleep(interval);
if (interval < c_INTERVAL_MAX) then
interval := interval * 2;
zsocket_destroy(ctx, worker);
worker := s_worker_socket(ctx);
liveness := c_HEARTBEAT_LIVENESS;
end;
end;
// Отправить хартбит в очередь, если пришло время
if (zclock_time() > heartbeat_at) then begin
heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL;
Z_log('I: worker heartbeat');
frame := zframe_new(@c_PPP_HEARTBEAT, 1);
zframe_send(frame, worker, 0);
end
end;
zctx_destroy(ctx);
end;
begin
doMain;
Readln;
end.
Комментарии к примеру:
- Как и раньше, в коде присутствует имитация отказов. Это делает код довольно сложным для отладки и опасным для повторного использования. Когда возникнет надобность в отладке такого кода, следует отключить имитацию отказов.
- Рабочий использует стратегию реконнекта, похожую на ту, которая используется в клиента шаблона "Ленивый пират", но с двумя важными отличиями: (a) реконнект выполняется через экспоненциально возрастающие промежутки времени, и (b) он выполняет его бесконечно (тогда как клиент после сбоя делает это фиксированное число раз).
start Reliable_ParanoidPirate_Broker.exe
for /l %%i in (1,1,5) do start Reliable_ParanoidPirate_Worker.exe
timeout /t 1
start Reliable_LazyPirate_Client.exe
В первой строке запускается очередь (брокера) Reliable_ParanoidPirate_Broker.exe.
Во второй строке запускается пять экземпляров рабочих Reliable_ParanoidPirate_Worker.exe.
В третьей строке задается пауза в 1 секунду для того, чтобы рабочие подключились к очереди.
В четвертой строке запускается клиент.
После запуска мы наблюдаем рабочих, имитирующих отказы (зависания и падения), и клиента, который в итоге отвалится, когда не останется доступных рабочих. Мы можем остановить и перезапустить очередь, при этом клиент рабочие выполнят реконнект и подхватят работу. И неважно, что делается с очередью и с рабочими: клиент никогда не отвалится, так как работает либо вся цепочка, либо клиент уходит в отказ.
Далее мы рассмотрим харбитинг более детально. (Продолжение)


Комментариев нет :
Отправить комментарий