(Начало - здесь).
Как и раньше, отдельный кластер будет представлен одним процессом.
Кода получилось довольно много, так как здесь объединяется логика обоих предыдущих примеров. Он достаточно хорошо имитирует кластеры, включающие клиентов и рабочих.
Эта программа уже не так проста, ее сложность приблизительно оценивается в один человеко-день.
Замечания по коду.
Эта имитация не обнаружитвает исчезновение облачного партнера. Если вы запустили несколько партнеров а затем остановили один, все остальные будут продолжать отправлять ему задания, как будто он не уходил. Можно попробовать так сделать - сразу увидим кучу сообщений от клиентов насчет потерянных запросов.
Есть два решения проблемы.
Первое: хранить информацию о количестве партеров в течении короткого времени так, что если партнер исчезает, его мощность быстро становится равной нулю.
Второе: добавить надежности в цепочку запрос - ответ.
Далее мы рассмотрим способы повышения надежности. (Продолжение).
Соберем все вместе.
Как и раньше, отдельный кластер будет представлен одним процессом.
Кода получилось довольно много, так как здесь объединяется логика обоих предыдущих примеров. Он достаточно хорошо имитирует кластеры, включающие клиентов и рабочих.
Код:
program Peering3; {$APPTYPE CONSOLE} uses // FastMM4, SysUtils , Classes , Windows , zmq_h , czmq_h , zmq_utils , math ; // Симуляция брокера партнеров (часть 3) // Прототипирование потоков данных состостояния и задач // Запуск:peering свой_порт список_портов_партнеров // // Для облачной связи используются порты: // // cloudfe (входные запросы) = свой_порт // cloudbe (запросы к партнерам) = чужие_порты // // statefe (подписка на состояние партнеров) = 1чужие порты // statebe (публикация своего состояния для партнеров) = 1свой_порт // // Для локальной связи используется протокол inproc: // localfe (входные запросы от клиентов) = localfe_inproc // localbe (запросы к рабочим) = localbe_inproc // monitor (служебные сообщения) = monitor_inproc const c_NBR_CLIENTS = 10; c_NBR_WORKERS = 5; c_WORKER_READY: byte = 1; // Сигнал "рабочий готов" // Свое собственное имя (порт ввода-вывода). // В реальности должно быть задано при конфиргурировнии узла var self: PChar = nil; function zFormat(const aFormat: string; const aArgs: array of const): string; var fFS: TFormatSettings; begin GetLocaleFormatSettings(GetUserDefaultLCID(), fFS); Result := Format(aFormat, aArgs, fFS) end; function client_task(args: Pointer): Pointer; cdecl; // Клиентская задача. Несколько секунд спит, затем порождает пачку запросов. // Это имитирует спорадическую активность, когда одновременно активно множество // клиентов и локальные рабочие должны оказаться перегружены. // Для запросов используется сокет REQ. // Для статистики данные отправляются в сокет мониторинга. var client: Pointer; monitor: Pointer; burst: Integer; task_id: pChar; pollset: zmq_pollitem_t; rc: Integer; reply: PChar; begin client := zsocket_new(args, ZMQ_REQ); rc := zsocket_connect(client, 'inproc://localfe_inproc'); monitor := zsocket_new(args, ZMQ_PUSH); rc := zsocket_connect(monitor, 'inproc://monitor_inproc'); while True do begin sleep(Random(5) * 1000); for burst := 0 to Random(10) do begin task_id := PChar('Task ' + IntToHex(Random($10000), 4) + 'h from ' + self); // Запрос со случайным значением id в 16сс rc := zstr_send(client, task_id); // Ответа ждать максимум 10 секунд, потом сообщить о проблеме zPollItemInit(pollset, client, 0, ZMQ_POLLIN, 0); rc := zmq_poll(@pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC); if rc = -1 then break; // Прерван if (pollset.revents and ZMQ_POLLIN) <> 0 then begin reply := zstr_recv(client); if reply = nil then break; // Прерван // assert(string(reply) = string(task_id)); // !!!! Это - сравнение строк zstr_send(monitor, reply); end else begin zstr_send(monitor, PChar('E: CLIENT EXIT - lost task ' + task_id)); result := nil; Exit; end end end; Result := nil; end; function worker_task(args: Pointer): Pointer; cdecl; // Задача рабочего. Для подключения к балансировщику нагрузки использует сокет // REQ. То же самое, что и в предудущих peering* примерах var frame: p_zframe_t; msg: p_zmsg_t; new_msg: string; rc: Integer; worker: Pointer; str_msg: PChar; begin worker := zsocket_new(args, ZMQ_REQ); rc := zsocket_connect(worker, 'inproc://localbe_inproc'); // Сообщаем брокеру о готовноти к работе frame := zframe_new(@c_WORKER_READY, 1); rc := zframe_send(frame, worker, 0); // Process messages as they arrive while true do begin msg := zmsg_recv(worker); if msg = nil then break; // Прерван // Рабочие как бы что-то делают 0 или 1 секунду Sleep(Random(2) * 1000); frame := zmsg_last(msg); str_msg := zframe_strdup(frame); new_msg := str_msg + ' processed by ' + self; zframe_reset(frame, PChar(new_msg), Length(new_msg)); zstr_free(str_msg); zmsg_send(msg, worker); end; result := nil; end; procedure doMain; // Основная задача начинается с настройки сокетов. // Локальные фронтэнд и бэкэнд общаются с клиентами и рабочими соответственно. // Облачные фронтэнд и бэкэнд общаются с брокерами - партнерами соответственно // так же, как если те они были клиентами и рабочими. // В бэкэнд состояние регулярно публикуются сообщения о собственном состоянии. // Фронтэнд состояния подписывается на сообщения о состоянии всех бэкэндов // для сбора сообщений о состоянии. // Для сбора различных "печатных" сообщений от задач используется PULL - сокет // мониторинга. var cloudbe: Pointer; cloudfe: Pointer; cloud_capacity: Integer; ctx: p_zctx_t; data: PChar; frame: p_zframe_t; i: Integer; identity: p_zframe_t; localbe: Pointer; localfe: Pointer; local_capacity: Integer; monitor: Pointer; msg: p_zmsg_t; peer: PChar; previous: Integer; primary: array[0..3] of zmq_pollitem_t; random_peer: Integer; secondary: array[0..1] of zmq_pollitem_t; rc: Integer; size: Size_t; statebe: Pointer; statefe: Pointer; status: PChar; workers: P_zlist_t; begin if (ParamCount < 2) then begin z_Log('syntax: peering3 me {you}...'); Exit; end; // Первый аргумент командной строки - адрес брокера // Остальные аргументы - адреса партнеров self := PChar(ParamStr(1)); z_Log('I: preparing broker at ' + self); Randomize; // Подготовка локальных фронтэнда и бэкэнда ctx := zctx_new(); localfe := zsocket_new(ctx, ZMQ_ROUTER); rc := zsocket_bind(localfe, 'inproc://localfe_inproc'); localbe := zsocket_new(ctx, ZMQ_ROUTER); rc := zsocket_bind(localbe, 'inproc://localbe_inproc'); // Привязка облачного фронтэнда к конечной конкретной точке cloudfe := zsocket_new(ctx, ZMQ_ROUTER); zsocket_set_identity(cloudfe, self); zsocket_bind(cloudfe, 'tcp://*:%s', self); // Коннект облачного бэкэнда ко всем портнерам cloudbe := zsocket_new(ctx, ZMQ_ROUTER); zsocket_set_identity(cloudbe, self); for i := 2 to ParamCount do begin z_Log('I: connecting to cloud frontend at ' + ParamStr(i)); rc := zsocket_connect(cloudbe, 'tcp://127.0.0.1:%s', PChar(ParamStr(i))); end; // Привязка бэкэнда состояния к конкретной конечной точке statebe := zsocket_new(ctx, ZMQ_PUB); zsocket_bind(statebe, 'tcp://*:1%s', self); // Фронтэнд состояния подключается ко всем партнерам statefe := zsocket_new(ctx, ZMQ_SUB); zsocket_set_subscribe(statefe, ''); for i := 2 to ParamCount do begin z_Log('I: connecting to state backend at ' + ParamStr(i)); rc := zsocket_connect(statefe, 'tcp://127.0.0.1:1%s', PChar(ParamStr(i))); end; // Подготовка сокета мониторинга monitor := zsocket_new(ctx, ZMQ_PULL); rc := zsocket_bind(monitor, 'inproc://monitor_inproc'); // После привязки и коннекта всех наших сокетов стартуем дочерние // задачи: рабочих и клиентов for i := 0 to Pred(c_NBR_WORKERS) do zthread_new(@worker_task, ctx); // Запуск локальных клиентов for i := 0 to Pred(c_NBR_CLIENTS) do zthread_new(@client_task, ctx); // Очередь незанятых рабочих local_capacity := 0; cloud_capacity := 0; workers := zlist_new(); // Главный цикл состоит из двух частей. // // 1. В любом случае опрашиваем локальных и облачных рабочих и два сервисных // сокета (statefe и monitor). Если свободных рабочих нет, то нет смысла // даже смотреть на входящие запросы (они останутся во внутренних очередях ZMQ) while (true) do begin zPollItemInit(primary[0], localbe, 0, ZMQ_POLLIN, 0); zPollItemInit(primary[1], cloudbe, 0, ZMQ_POLLIN, 0); zPollItemInit(primary[2], statefe, 0, ZMQ_POLLIN, 0); zPollItemInit(primary[3], monitor, 0, ZMQ_POLLIN, 0); // Если свободных рабочих нет, ждем бесконечно (пока не появятся) // Если есть - ждем до 1 секунды rc := zmq_poll(@primary[0], 4, IfThen(local_capacity > 0, 1000 * ZMQ_POLL_MSEC, -1)); if rc = -1 then Break; // Прерван // Отлов изменения количества свободных рабочих в течении этой итерации previous := local_capacity; msg := nil; if (primary[0].revents and ZMQ_POLLIN) <> 0 then begin // Ответ от локального рабочего ****************************************** msg := zmsg_recv(localbe); if msg = nil then break; // Прерван identity := zmsg_unwrap(msg); // Кадр с идентификатором рабочего выдергиваем zlist_append(workers, identity); // ... и запоминаем в очереди Inc(local_capacity); // Если это - сигнал "Готов", сообщение вообще не обрабатывается frame := zmsg_first(msg); if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then zmsg_destroy(msg); end else if (primary[1].revents and ZMQ_POLLIN) <> 0 then begin // ...либо обработать ответ от брокера - партнера ************************ msg := zmsg_recv(cloudbe); if msg = nil then break; // Прерван // Идентификатор брокера не используется, отбрасываем identity := zmsg_unwrap(msg); zframe_destroy(identity); end; // Перенаправление ответа в облако, если он адресован брокеру if msg <> nil then for i := 2 to ParamCount do begin data := PChar(zframe_data(zmsg_first(msg))); size := zframe_size(zmsg_first(msg)); if (size = Length(ParamStr(i))) and (CompareMem(data, PChar(ParamStr(i)), size)) then begin zmsg_send(msg, cloudfe); Break end; end; // Если сообщение все еще не отправлено, перенаправить его клиенту if msg <> nil then zmsg_send(msg, localfe); // Если в сокетах statefe или в monitor есть входящие сообщения, // обрабатываем их немедленно if (primary[2].revents and ZMQ_POLLIN) <> 0 then begin // Сообщение о статусе партнера ****************************************** peer := zstr_recv(statefe); status := zstr_recv(statefe); cloud_capacity := StrToInt(status); zstr_free(peer); zstr_free(status); end; if (primary[3].revents and ZMQ_POLLIN) <> 0 then begin // Вывод сообщения мониторинга ******************************************* status := zstr_recv(monitor); z_Log(status); zstr_free(status); end; // 2. Теперь распределяем рабочим столько клиентских запросов, сколько // мы можем обработать. // Если есть локальные мощности, то опрашиваем и localfe и cloudfe. // Если есть только облачные мощности, опрашиваем только localfe. // Если есть возможность, распределяем задачи локально, иначе распределяем // задачи в облако. while (local_capacity + cloud_capacity) > 0 do begin zPollItemInit(secondary[0], localfe, 0, ZMQ_POLLIN, 0); zPollItemInit(secondary[1], cloudfe, 0, ZMQ_POLLIN, 0); rc := zmq_poll(@secondary[0], IfThen(local_capacity > 0, 2, 1), 0); assert(rc >= 0); if (secondary[0].revents and ZMQ_POLLIN) <> 0 then msg := zmsg_recv(localfe) else if (secondary[1].revents and ZMQ_POLLIN) <> 0 then msg := zmsg_recv(cloudfe) else break; // Работы нет, возвращаемся снова к primary if local_capacity > 0 then begin frame := zlist_pop(workers); // Добавляем кадр с идентификатором zmsg_wrap(msg, frame); // свободного рабочего как адрес получателя zmsg_send(msg, localbe); Dec(local_capacity); // Рабочий занят end else begin // Отправляем случайному брокеру - партнеру // PS: в общем случае, этот партнер может и не иметь свободных // рабочих, ибо cloud_capacity мог быть получен от кого угодно по // принципу round-robin random_peer := Random(ParamCount - 1) + 2; zmsg_pushmem(msg, PChar(ParamStr(random_peer)), Length(ParamStr(random_peer))); zmsg_send(msg, cloudbe); end; end; // end of while if (local_capacity <> previous) then begin // Публикуем сообщения для партнеров о собственной рабочей мощности // Для уменьшения "дребезга" делаем это to только при изменении // Формируем собственный идентификатор в составе конверта zstr_sendm(statebe, self); // Публикация нового значения рабочей мощности zstr_send(statebe, PChar(IntToStr(local_capacity))); end; end; // Подчистка хвостов при выходе while zlist_size(workers) <> 0 do begin frame := zlist_pop(workers); zframe_destroy(frame); end; zlist_destroy(workers); zctx_destroy(ctx); end; begin IsMultiThread := True; zsys_handler_set(nil); doMain; Readln; end.
Эта программа уже не так проста, ее сложность приблизительно оценивается в один человеко-день.
Замечания по коду.
- Клиентские нити обнаруживают и сообщают о невыполненных запросах. Это делается уже знакомым методом поллинга и, в случае отсутствия ответа в течении 10 секунд, печатается сообщение об ошибке.
- Клиентские нити сами ничего не печатают, вместо этого они направляют сообщение сокету мониторинга (PUSH), который в цикле собирает их (PULL) и печатает. Это первый случай использования сокетов ZeroMQ для мониторинга и логирования. К этому способу мы еще вернемся.
- Клиенты имитируют различную нагрузку, чтобы получать доступ к кластеру в 100% случайные моменты времени, чтобы задачи могли попадать в облако. Это управляется количеством клиентов и рабочих а также временем задержки в клиентских и рабочих нитях. Для более точной имитации реальной ити значения можно менять и смотреть, что получится.
- Основной цикл использует два поллинг - набора. В действительности нужно использовать три: информация о состоянии, бэкэнд (рабочие) и фронтэнд (клиенты). Как и в предыдущих прототипах, сообщения из фронтэнда на сычитываются, если нет свободный рабочих в бэкэнде.
- Для облачных (между "кластерами") потоков данных в качестве транспорта используется все тот же протокол tcp. Для локальных потоков данных в этот раз я решил использовать протокол inproc.
В процессе разработки этого примера возникли следующие трудности:
- Из-за потерь сообщений (запросов или ответов) клиенты зависают. Напомню, что сокет ROUTER отбрасывет сообщения, которые не могут быть маршрутизированы. Для решения проблемы был модифицирована клиентская нить, чтобы она обнаруживала такую ситуацию. В процессе поиска проблемы использовался диагностический метод zmsg_dump(), который был размещен после каждого приема и перед каждой отправкой в главном цикле, пока проблема не разрешилась.
- В основном цикле ошибочно читался более чем один готовый сокет. Из-за этого первое сообщение терялось. Алгоритмс был изменен так, что чтение выполнялось только из первого готового сокета.
- Класс zmsg неправильно декодирует UUID-ы как строки C. Это происходит из-за того, что UUID-ы содержат нулевый байты. Пофиксено методом замены UUID-ов на печатные шестнадцатеричные строки.
Эта имитация не обнаружитвает исчезновение облачного партнера. Если вы запустили несколько партнеров а затем остановили один, все остальные будут продолжать отправлять ему задания, как будто он не уходил. Можно попробовать так сделать - сразу увидим кучу сообщений от клиентов насчет потерянных запросов.
Есть два решения проблемы.
Первое: хранить информацию о количестве партеров в течении короткого времени так, что если партнер исчезает, его мощность быстро становится равной нулю.
Второе: добавить надежности в цепочку запрос - ответ.
Далее мы рассмотрим способы повышения надежности. (Продолжение).
Комментариев нет :
Отправить комментарий