(Начало).
Если кто-то возжелает написать и брокер в объектном стиле, напоминаю, что вторым аргументом метода zhash_freefn() должна быть ссылка на функцию типа zhash_free_fn:
Ничего нового в нижеследующем коде нет, за исключением того, что он написан с использованием объектов и классов Delphi.
Код клиента в "объектном" стиле::
Код клиента в "объектном" стиле::
program mdclient; {$APPTYPE CONSOLE} uses SysUtils, zmq_h, czmq_h, mdcliapi in 'mdcliapi.pas', mdp in 'mdp.pas'; procedure DoIt; var i: Integer; fSession: TmdClient; fVerbose: Boolean; reply: p_zmsg_t; request: p_zmsg_t; begin fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v'); Writeln(ParamStr(1)); fSession := TmdClient.Create('tcp://localhost:5555', fVerbose); try i := 0; for i := 0 to 99999 do begin request := zmsg_new(); zmsg_pushstr(request, 'Hello world'); reply := fSession.Send('echo', request); if reply <> nil then zmsg_destroy(reply) else break; // Прерывание или отказ end; zclock_log('%d requests / replies processed'#10, i); finally fSession.Free; end; end; begin DoIt; Readln; end.
Код API клиента в "объектном" стиле::
unit mdcliapi; interface uses mdp , zmq_h , czmq_h , ZMQ_Utils; type TmdClient = class(TObject) private Ctx: p_zctx_t; // ZMQ контекст Broker: string; sctClient: Pointer; // Сокет для связи с брокером Verbose: Boolean; // Протоколирование в stdout Timeout: integer; // Таймаут запроса Retries: integer; // Число попыток procedure ConnectToBrocker; public constructor Create(aBroker: string; aVerbose: Boolean); destructor Destroy; override; function Send(service: PChar; var request: p_zmsg_t): p_zmsg_t; end; implementation uses SysUtils; procedure TmdClient.ConnectToBrocker; begin if sctClient <> nil then zsocket_destroy(ctx, sctClient); sctClient := zsocket_new(ctx, ZMQ_REQ); zmq_connect(sctClient, PChar(broker)); if verbose then zclock_log('I: connecting to broker at %s...', Broker); end; constructor TmdClient.Create(aBroker: string; aVerbose: Boolean); begin Ctx := zctx_new(); Broker := aBroker; Verbose := aVerbose; Timeout := 2500; // msecs Retries := 3; // Before we abandon ConnectToBrocker(); end; destructor TmdClient.Destroy; begin inherited; zctx_destroy(Ctx); Broker := ''; end; function TmdClient.Send(service: PChar; var request: p_zmsg_t): p_zmsg_t; var header: p_zframe_t; item: zmq_pollitem_t; msg: p_zmsg_t; rc: Integer; reply_service: p_zframe_t; retries_left: Integer; begin assert(request <> nil); // В соответствии с требованиями протокола, должны быть фреймы-префиксы // Фрейм 1: "MDPCxy" (шесть байт, MDP/Client x.y) // Фрейм 2: Имя сервиса (печатная строка) zmsg_pushstr(request, service); zmsg_pushstr(request, cMDPC_CLIENT); if verbose then begin zclock_log('I: send request to ''%s'' service:', service); zmsg_print(request); end; retries_left := Retries; while (retries_left > 0) and (zctx_interrupted = 0) do begin msg := zmsg_dup(request); // Работаем с копией (вдруг оригинал пригодится?) zmsg_send(msg, sctClient); zPollItemInit(item, sctClient, 0, ZMQ_POLLIN, 0); // При любом блокирующем вызове (libzmq), в случае ошибки будет -1; // Теоретически, следует учесть разные коды завершения, // но на практике достаточно учесть {EINTR} (Ctrl-C): rc := zmq_poll(@item, 1, Timeout * ZMQ_POLL_MSEC); if rc = -1 then break; // Прерван // Что-то приняли, обрабатываем if (item.revents and ZMQ_POLLIN) <> 0 then begin msg := zmsg_recv(sctClient); if Verbose then begin zclock_log('I: received reply:'); zmsg_print(msg); end; // В реальном коде обрабботка отказа должна быть более тщательной assert(zmsg_size(msg) >= 3); header := zmsg_pop(msg); assert(zframe_streq(header, cMDPC_CLIENT)); zframe_destroy(header); reply_service := zmsg_pop(msg); assert(zframe_streq(reply_service, service)); zframe_destroy(&reply_service); zmsg_destroy(request); Result := msg; // Успех Exit; end else if s_Dec(retries_left) > 0 then begin if (Verbose) then zclock_log('W: no reply, reconnecting...'); ConnectToBrocker; end else begin if (Verbose) then zclock_log('W: permanent error, abandoning'); break; // Всё end; end; if zctx_interrupted <> 0 then zclock_log('W: interrupt received, killing client...'#10); zmsg_destroy(request); Result := nil; end; end.
Код Рабочего в "объектном" стиле::
program mdworker; {$APPTYPE CONSOLE} uses SysUtils, zmq_h, czmq_h, mdwrkapi in 'mdwrkapi.pas', mdp in 'mdp.pas'; procedure DoIt; var fVerbose: Boolean; fReply: p_zmsg_t; fRequest: p_zmsg_t; fSession: TmdWorker; begin fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v'); fSession := TmdWorker.Create('tcp://localhost:5555', 'echo', fVerbose); fReply := nil; while (true) do begin fRequest := fSession.Recv(fReply); if (fRequest = nil) then break; // Рабочий был прерван fReply := fRequest; // Эхо ... :-) end; fSession.Free; Exit; end; begin doIt; Readln; end.
Код API рабочего в "объектном" стиле::
unit mdwrkapi; interface uses zmq_h , czmq_h ; type TmdWorker = class(TObject) Ctx: p_zctx_t; // Наш контекст Broker: string; Service: string; sctWorker: Pointer; // Сокет для связи с брокером Socket to broker Verbose: Boolean; // Протоколировать действия в stdout // Управление хартбитингом HeartbeatAt: uint64_t; Liveness: size_t; // Сколько осталось попыток Heartbeat: integer; // Задержка хартбитинга, в миллисекундах Reconnect: Integer; // Задержка реконнекта в миллисекундах ExpectReply: Boolean; ReplyTo: p_zframe_t; private procedure ConnectToBroker; procedure SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t); public constructor Create(const aBroker, aService: string; aVerbose: Boolean); destructor Destroy; override; function Recv(var reply: p_zmsg_t): p_zmsg_t; end; const // Параметры надежности cHEARTBEAT_LIVENESS = 3; // 3-5 достаточно implementation uses ZMQ_Utils, mdp; constructor TmdWorker.Create(const aBroker, aService: string; aVerbose: Boolean); begin assert(aBroker <> ''); assert(aService <> ''); Ctx := zctx_new(); Broker := aBroker; Service := aService; Verbose := aVerbose; Heartbeat := 2500; // msecs Reconnect := 2500; // msecs ConnectToBroker(); end; destructor TmdWorker.Destroy; begin inherited; zctx_destroy(Ctx); Broker := ''; Service := ''; end; function TmdWorker.Recv(var reply: p_zmsg_t): p_zmsg_t; // Метод сперва отсылает брокеру ответ, а затем ждет нового запроса. // Название метода странное, угу. // Отправляет ответ брокеру, если он есть и ждет следующего запроса var fCommand: p_zframe_t; fEmpty: p_zframe_t; fHeader: p_zframe_t; fItem: zmq_pollitem_t; fMsg: p_zmsg_t; RC: Integer; begin // Формирование и отсылка ответа (reply), если он пустой assert((reply <> nil) or (not ExpectReply)); if (reply <> nil) then begin assert(ReplyTo <> nil); zmsg_wrap(reply, ReplyTo); SendToBroker(cMDPW_REPLY, nil, reply); zmsg_destroy(reply); end; ExpectReply := true; while (true) do begin zPollItemInit(fItem, sctWorker, 0, ZMQ_POLLIN, 0); RC := zmq_poll(@fItem, 1, Heartbeat * ZMQ_POLL_MSEC); if RC = -1 then break; // Прерван if (fItem.revents and ZMQ_POLLIN) <> 0 then begin fMsg := zmsg_recv(sctWorker); if fMsg = nil then break; // Прерван if Verbose then begin zclock_log('I: received message from broker:'); zmsg_print(fMsg); end; Liveness := cHEARTBEAT_LIVENESS; // Ошибки не обрабатываем, просто шумим assert-от assert(zmsg_size(fMsg) >= 3); fEmpty := zmsg_pop(fMsg); assert(zframe_streq(fEmpty, '')); zframe_destroy(fEmpty); fHeader := zmsg_pop(fMsg); assert(zframe_streq(fHeader, cMDPW_WORKER)); zframe_destroy(fHeader); fCommand := zmsg_pop(fMsg); if (zframe_streq(fCommand, cMDPW_REQUEST)) then begin // Вообще-то, мы должны извлечь и сохранить все адреса вплоть до // пустого, но сейчас сохранем просто один... ReplyTo := zmsg_unwrap(fMsg); zframe_destroy(fCommand); // В этом месте мы должны обрабатывать сообщение // Сейчас просто возвращаем его в приложение Result := fMsg; // Запрос на обработку Exit; end else if (zframe_streq(fCommand, cMDPW_HEARTBEAT)) then // ; // В случае хартбита ничего не делаем else if (zframe_streq(fCommand, cMDPW_DISCONNECT)) then ConnectToBroker() else begin zclock_log('E: invalid input message'); zmsg_print(fMsg); end; zframe_destroy(&fCommand); zmsg_destroy(&fMsg); end else if s_Dec(Liveness) = 0 then begin if Verbose then zclock_log('W: disconnected from broker - retrying...'); zclock_sleep(Reconnect); ConnectToBroker(); end; // Отправить хратбит, если пора if zclock_time() > HeartbeatAt then begin SendToBroker(cMDPW_HEARTBEAT, nil, nil); HeartbeatAt := zclock_time() + Heartbeat; end; end; if zctx_interrupted() <> 0 then z_log('W: interrupt received, killing worker...'#10); Result := nil; end; procedure TmdWorker.ConnectToBroker; // Коннект или реконнект к брокеру begin if sctWorker <> nil then // Полный дисконнект, с разрушенеим сокета zsocket_destroy(Ctx, sctWorker); sctWorker := zsocket_new(Ctx, ZMQ_DEALER); zmq_connect(sctWorker, PChar(Broker)); if Verbose then zclock_log('I: connecting to broker at %s...', PChar(Broker)); // Регистрация сервиса в брокере SendToBroker(cMDPW_READY, PChar(Service), nil); // Если число попыток (liveness) обнулилось, считаем, что брокер отключен Liveness := cHEARTBEAT_LIVENESS; HeartbeatAt := zclock_time() + heartbeat; end; procedure TmdWorker.SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t); // Отправка сообщения брокеру // Если сообщения нет (пусто), создать его самостоятельно begin if aMsg <> nil then aMsg := zmsg_dup(aMsg) else aMsg := zmsg_new(); // Формирование конверта сообщения в сответствии с протоколом if aOption <> nil then zmsg_pushstr(aMsg, aOption); zmsg_pushstr(aMsg, aCommand); zmsg_pushstr(aMsg, cMDPW_WORKER); zmsg_pushstr(aMsg, ''); if Verbose then begin zclock_log('I: sending %s to broker', mdps_commands[Byte(aCommand^)]); zmsg_print(aMsg); end; zmsg_send(aMsg, sctWorker); end; end.
zhash_free_fn = procedure(data: Pointer); cdecl;Чтобы описать такой метод в рамках класса, следует использовать модификаторы class и static
class procedure service_destroy(service: Pointer); static;Честно говоря, я бы не стал заморачиваться с zhash_lookup(), а использовал бы вместо них, например, дельфийские TStringList с включенной сортировкой.