Запрос/Ответ. Высоконадежный брокер с очередью и хартбитингом (шаблон "Пират - параноик").
Шаблон "Пират - параноик"
Шаблон "Простой пират" достаточно хорош особенно тем, что он является простой комбинацией двух уже существующих шаблонов. Тем не менее, и у него тоже есть некоторые недостатки:
- Он перестает работать в случае, когда очередь (брокер) падает и перезапускается. Клиент восстановится, а рабочие - нет. Хотя ZeroMQ и выполнит автоматический реконнект после перезапуска очереди, рабочие не пошлют сигнала "ГОТОВ" и, следовательно, не будут считаться доступными. Для исправления реализуем хартбитинг от очереди к рабочим так, чтобы рабочий смог определить, что очередь стала недоступной.
- Очередь не обнаруживает отказов рабочих, поэтому, если рабочие падают во время простоя, брокер не может удалить таких рабочих их очереди доступных рабочих до тех пор, пока брокер не не пошлет такому рабочему запрос. Клиент будет ждать и делать перезапросы в никуда. Это не очень большая проблема, но это неприятно. Чтобы все работало правильно, реализуем хартбитинг от рабочего к очереди так, чтобы очередь могла определять потерянных рабочих на любом этапе.
Топология сети для схемы "Пират - параноик":
Ранее для рабочего мы использовали сокет REQ. Для реализации шаблона "Пират - параноик" мы перейдем на сокет DEALER. Это даст возможность отправлять и принимать сообщения в любые моменты времени, а не просто в последовательности "запрос/ответ", как того требует жесткая архитектура сокета REQ. Недостатком сокета DEALER является необходимость ручного управления построением структуры конверта сообщения. (См. Схема "Издатель - Подписчик", подробности, структура конверта).
Мы продолжаем использовать клиента из шаблона "Ленивый пират".
Код очереди (брокера) для шаблона "Пират - параноик":
program Reliable_ParanoidPirate_Broker; {$APPTYPE CONSOLE} uses SysUtils , zmq_h , czmq_h , zmq_utils , Math ; // Очередь (брокер) для шаблона "Пират - параноик" const c_HEARTBEAT_LIVENESS = 3; // Обычно 3-5 c_HEARTBEAT_INTERVAL = 1000; // msecs // Константы для протокола "Пират- параноик" c_PPP_READY: Byte = 1; // Сигнал готовности рабочего c_PPP_HEARTBEAT: Byte = 2; // Сигнал хартбитинга рабочего type TWorker = class // Класс, описывающий рабочего. public identity: p_zframe_t; // Идентификатор рабочего id_string: PChar; // Строковое представление идентификатора expiry: int64_t; // Момент смерти procedure Ready(workers: p_zlist_t); constructor Create(a_identity: p_zframe_t); destructor Destroy; override; end; // Метод s_workers_next возвращает идентификатор следующего доступного рабочего: function s_workers_next(workers: p_zlist_t): p_zframe_t; var worker: TWorker; begin worker := zlist_pop(workers); assert(worker <> nil); Result := worker.identity; worker.identity := nil; worker.Free; end; procedure s_workers_purge(workers: p_zlist_t); // Метод s_workers_purge находит и уничтожает все объекты мертвых рабочих. // Обрабатка идет от старейших к недавно использованным. Таким образом, мы // останавливаемся на первом доступном рабочем: var worker: TWorker; begin worker := zlist_first(workers); while worker <> nil do begin if zclock_time() < worker.expiry then break; // Рабочий жив, используем zlist_remove(workers, worker); worker.Free; worker := zlist_first(workers); end; end; procedure doMain(); // Основаня задача представляет собой балансировщик нагрузки с // харбитингом к рабочим. Таким образом, мы можем обнаружить упавшие // или зависшие задачи рабочих: var backend: Pointer; ctx: p_zctx_t; frame: p_zframe_t; frontend: Pointer; heartbeat_at: uint64_t; identity: p_zframe_t; items: array[0..1] of zmq_pollitem_t; msg: p_zmsg_t; rc: Integer; worker: TWorker; workers: p_zlist_t; begin ctx := zctx_new(); frontend := zsocket_new(ctx, ZMQ_ROUTER); backend := zsocket_new(ctx, ZMQ_ROUTER); zsocket_bind(frontend, 'tcp://*:5555'); // Для клиентов zsocket_bind(backend, 'tcp://*:5556'); // Для рабочих // Список доступных рабочих workers := zlist_new(); // Посылаем хартбиты через одинаковые интервалы времени heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL; while True do begin zPollItemInit(items[0], backend, 0, ZMQ_POLLIN, 0); zPollItemInit(items[1], frontend, 0, ZMQ_POLLIN, 0); // Опрашиваем фронтенд только тогда, когда есть доступные рабочие rc := zmq_poll(@items[0], IfThen(zlist_size(workers) > 0, 2, 1), c_HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC); if rc = -1 then break; // Прерван // Обработка активности рабочена на бэкэнде if (items[0].revents and ZMQ_POLLIN) <> 0 then begin // Используем идентификацию рабочего для балансировщика нагрузки msg := zmsg_recv(backend); if msg = nil then break; // Прерван // Любое проявление активноси рабочего подразумевает его готовность identity := zmsg_unwrap(msg); worker := TWorker.Create(identity); worker.Ready(workers); // Валидация управляющего сообщения, или возврат ответа клиенту if zmsg_size(msg) = 1 then begin frame := zmsg_first(msg); if not (CompareMem(zframe_data(frame), @c_PPP_READY, 1) or CompareMem(zframe_data(frame), @c_PPP_HEARTBEAT, 1)) then begin z_Log('E: invalid message from worker'); zmsg_print(msg); end; zmsg_destroy(msg); end else zmsg_send(msg, frontend); end; if (items[1].revents and ZMQ_POLLIN) <> 0 then begin // Получаем следующий запрос клиента и распределяем его следующему рабочему msg := zmsg_recv(frontend); if msg = nil then break; // Прерван zmsg_push(msg, s_workers_next(workers)); zmsg_send(msg, backend); end; // Обрабатываем хартбитинг после любой активности сокета. Сначала отправляем // хартбиты всем простаивающим в данный момент времени рабочим. Затем очищаемся // от мертвых рабочих: if (zclock_time() >= heartbeat_at) then begin worker := zlist_first(workers); while worker <> nil do begin // Строим пакет сообщения. ZFRAME_REUSE - без разрушения фрейма zframe_send(worker.identity, backend, c_ZFRAME_REUSE + c_ZFRAME_MORE); frame := zframe_new(@c_PPP_HEARTBEAT, 1); zframe_send(frame, backend, 0); worker := zlist_next(workers); end; // Следующий хартбит heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL; end; s_workers_purge(workers); // Сбрасываем мертвых рабочих end; // while // Завершение работы, очистка. while zlist_size(workers) > 0 do begin worker := zlist_pop(workers); worker.Free; end; zlist_destroy(workers); zctx_destroy(ctx); end; constructor TWorker.Create(a_identity: p_zframe_t); begin // Создание объекта рабочего identity := a_identity; id_string := zframe_strhex(identity); expiry := zclock_time() + c_HEARTBEAT_INTERVAL * c_HEARTBEAT_LIVENESS; end; destructor TWorker.Destroy; begin // Разрушение объекта рабочего, в том числе идентифицирущего фрейма // и строкового идентификатора. // Разрушаем с помощью методов czmq, так как эти объекты создавались // в недельфийском диспетчере памяти zframe_destroy(identity); zstr_free(id_string); inherited; end; procedure TWorker.Ready(workers: p_zlist_t); var worker: TWorker; begin // Метод ready помещает рабочего в конец списка готовых рабочих: worker := zlist_first(workers); while worker <> nil do begin if string(id_string) = string(worker.id_string) then begin zlist_remove(workers, worker); worker.Free; break; end; worker := zlist_next(workers); end; zlist_append(workers, self); end; begin IsMultiThread := True; doMain(); readln; end.
Очередь представляет собой усложненный шаблон балансировки нагрузки с хартбитингом рабочих. Хартбитинг - не самая простая для понимания вещь. Однако, все станет совершенно ясно через пару секунд.
Код рабочего для шаблона "Пират - параноик":
program Reliable_ParanoidPirate_Worker; {$APPTYPE CONSOLE} uses SysUtils , zmq_h , czmq_h , zmq_utils , Math ; // Рабочий для шаблона "Пират - параноик" const c_HEARTBEAT_LIVENESS = 3; // Нормально, если 3-5 c_HEARTBEAT_INTERVAL = 1000; // msecs c_INTERVAL_INIT = 1000; // Начальный интервал реконнекта c_INTERVAL_MAX = 32000; // После экпоненциальной отсрочки // Константы протокола "Пират - параноик" c_PPP_READY: Byte = 1; // Сигнал готовности рабочего c_PPP_HEARTBEAT: Byte = 2; // Сигнал хартбитинга рабочего // Вспомогательная функция, возвращающая заново настроенный сокет, // подключенный к очереди шаблоона "Параллельный пират" function s_worker_socket(ctx: p_zctx_t): pointer; var frame: p_zframe_t; begin Result := zsocket_new(ctx, ZMQ_DEALER); zsocket_connect(Result, 'tcp://localhost:5556'); // Сообщаем очереди о готовности к работе z_Log('I: worker ready'); frame := zframe_new(@c_PPP_READY, 1); zframe_send(frame, Result, 0); end; procedure doMain; var ctx: p_zctx_t; cycles: Integer; frame: p_zframe_t; heartbeat_at: uint64_t; interval: size_t; items: zmq_pollitem_t; liveness: size_t; msg: p_zmsg_t; rc: Integer; worker: pointer; begin // Протокол "Пират - параноик" (Paranoid Pirate Protocol - PPP) со стороны // рабочего выполнен в виде одной задачи. В данном случае особенно интересна // реализация хартбитинга, которая позволяет рабочему определять доступность // очереди (и наоборот) ctx := zctx_new(); worker := s_worker_socket(ctx); // Если индикатор живучести liveness обнуляется, считаем, что связь // с очередью прервалась liveness := c_HEARTBEAT_LIVENESS; interval := c_INTERVAL_INIT; // Отсылает хартбиты через равные промежутки времени heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL; Randomize(); cycles := 0; while true do begin zPollItemInit(items, worker, 0, ZMQ_POLLIN, 0); rc := zmq_poll(@items, 1, c_HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC); if rc = -1 then break; // Прерван if (items.revents and ZMQ_POLLIN) <> 0 then begin // Получаем сообщение. Если оно состоит из // - 3х частей (конверт + содержимое) -> это запрос // - 1й части (хартбит) -> это хартбит msg := zmsg_recv(worker); if msg = nil then break; // Прерван // Тестируем живучесть реализации очереди. Для этого имитируем различные // проблемы: падение рабочего или подвисание рабочего. // Делаем это после выполнения нескольких циклов таки образом, чтобы система // изначально смогла запуститься: if zmsg_size(msg) = 3 then begin Inc(cycles); if (cycles > 3) and (Random(5) = 0) then begin z_Log('I: simulating a crash'); zmsg_destroy(msg); break; end else if (cycles > 3) and (Random(5) = 0) then begin z_Log('I: simulating CPU overload'); sleep(3000); if zctx_interrupted = 1 then break; end; z_Log('I: normal reply'); zmsg_send(msg, worker); liveness := c_HEARTBEAT_LIVENESS; sleep(1000); // Do some heavy work if zctx_interrupted = 1 then break; end else // При получения от очереди сообщения харбитинга считаем, что очередь // (была) жива, поэтому сбрасываем индикатор живучести liveness: if zmsg_size(msg) = 1 then begin frame := zmsg_first(msg); if CompareMem(zframe_data(frame), @c_PPP_HEARTBEAT, 1) then liveness := c_HEARTBEAT_LIVENESS else begin z_Log('E: invalid message'); zmsg_print(msg); end; zmsg_destroy(&msg); end else begin Z_log('E: invalid message'); zmsg_print(msg); end; interval := c_INTERVAL_INIT; end else begin // Если очередь так и не прислала хартбитинг, уничтожаем сокет и реконнектимся. // Вот такой простой и суровый способ отбросить все отправленные сообщения: Dec(liveness); if liveness = 0 then begin Z_log('W: heartbeat failure, can''t reach queue'); Z_log(Format('W: reconnecting in %ud msec...', [interval])); zclock_sleep(interval); if (interval < c_INTERVAL_MAX) then interval := interval * 2; zsocket_destroy(ctx, worker); worker := s_worker_socket(ctx); liveness := c_HEARTBEAT_LIVENESS; end; end; // Отправить хартбит в очередь, если пришло время if (zclock_time() > heartbeat_at) then begin heartbeat_at := zclock_time() + c_HEARTBEAT_INTERVAL; Z_log('I: worker heartbeat'); frame := zframe_new(@c_PPP_HEARTBEAT, 1); zframe_send(frame, worker, 0); end end; zctx_destroy(ctx); end; begin doMain; Readln; end.
Комментарии к примеру:
- Как и раньше, в коде присутствует имитация отказов. Это делает код довольно сложным для отладки и опасным для повторного использования. Когда возникнет надобность в отладке такого кода, следует отключить имитацию отказов.
- Рабочий использует стратегию реконнекта, похожую на ту, которая используется в клиента шаблона "Ленивый пират", но с двумя важными отличиями: (a) реконнект выполняется через экспоненциально возрастающие промежутки времени, и (b) он выполняет его бесконечно (тогда как клиент после сбоя делает это фиксированное число раз).
start Reliable_ParanoidPirate_Broker.exe
for /l %%i in (1,1,5) do start Reliable_ParanoidPirate_Worker.exe
timeout /t 1
start Reliable_LazyPirate_Client.exe
В первой строке запускается очередь (брокера) Reliable_ParanoidPirate_Broker.exe.
Во второй строке запускается пять экземпляров рабочих Reliable_ParanoidPirate_Worker.exe.
В третьей строке задается пауза в 1 секунду для того, чтобы рабочие подключились к очереди.
В четвертой строке запускается клиент.
После запуска мы наблюдаем рабочих, имитирующих отказы (зависания и падения), и клиента, который в итоге отвалится, когда не останется доступных рабочих. Мы можем остановить и перезапустить очередь, при этом клиент рабочие выполнят реконнект и подхватят работу. И неважно, что делается с очередью и с рабочими: клиент никогда не отвалится, так как работает либо вся цепочка, либо клиент уходит в отказ.
Далее мы рассмотрим харбитинг более детально. (Продолжение)
Комментариев нет :
Отправить комментарий