(Начало).
Если кто-то возжелает написать и брокер в объектном стиле, напоминаю, что вторым аргументом метода zhash_freefn() должна быть ссылка на функцию типа zhash_free_fn:
Ничего нового в нижеследующем коде нет, за исключением того, что он написан с использованием объектов и классов Delphi.
Код клиента в "объектном" стиле::
Код клиента в "объектном" стиле::
program mdclient;
{$APPTYPE CONSOLE}
uses
SysUtils,
zmq_h,
czmq_h,
mdcliapi in 'mdcliapi.pas',
mdp in 'mdp.pas';
procedure DoIt;
var
i: Integer;
fSession: TmdClient;
fVerbose: Boolean;
reply: p_zmsg_t;
request: p_zmsg_t;
begin
fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
Writeln(ParamStr(1));
fSession := TmdClient.Create('tcp://localhost:5555', fVerbose);
try
i := 0;
for i := 0 to 99999 do begin
request := zmsg_new();
zmsg_pushstr(request, 'Hello world');
reply := fSession.Send('echo', request);
if reply <> nil then
zmsg_destroy(reply)
else
break; // Прерывание или отказ
end;
zclock_log('%d requests / replies processed'#10, i);
finally
fSession.Free;
end;
end;
begin
DoIt;
Readln;
end.
Код API клиента в "объектном" стиле::
unit mdcliapi;
interface
uses
mdp
, zmq_h
, czmq_h
, ZMQ_Utils;
type
TmdClient = class(TObject)
private
Ctx: p_zctx_t; // ZMQ контекст
Broker: string;
sctClient: Pointer; // Сокет для связи с брокером
Verbose: Boolean; // Протоколирование в stdout
Timeout: integer; // Таймаут запроса
Retries: integer; // Число попыток
procedure ConnectToBrocker;
public
constructor Create(aBroker: string; aVerbose: Boolean);
destructor Destroy; override;
function Send(service: PChar; var request: p_zmsg_t): p_zmsg_t;
end;
implementation
uses
SysUtils;
procedure TmdClient.ConnectToBrocker;
begin
if sctClient <> nil then
zsocket_destroy(ctx, sctClient);
sctClient := zsocket_new(ctx, ZMQ_REQ);
zmq_connect(sctClient, PChar(broker));
if verbose then
zclock_log('I: connecting to broker at %s...', Broker);
end;
constructor TmdClient.Create(aBroker: string; aVerbose: Boolean);
begin
Ctx := zctx_new();
Broker := aBroker;
Verbose := aVerbose;
Timeout := 2500; // msecs
Retries := 3; // Before we abandon
ConnectToBrocker();
end;
destructor TmdClient.Destroy;
begin
inherited;
zctx_destroy(Ctx);
Broker := '';
end;
function TmdClient.Send(service: PChar; var request: p_zmsg_t): p_zmsg_t;
var
header: p_zframe_t;
item: zmq_pollitem_t;
msg: p_zmsg_t;
rc: Integer;
reply_service: p_zframe_t;
retries_left: Integer;
begin
assert(request <> nil);
// В соответствии с требованиями протокола, должны быть фреймы-префиксы
// Фрейм 1: "MDPCxy" (шесть байт, MDP/Client x.y)
// Фрейм 2: Имя сервиса (печатная строка)
zmsg_pushstr(request, service);
zmsg_pushstr(request, cMDPC_CLIENT);
if verbose then begin
zclock_log('I: send request to ''%s'' service:', service);
zmsg_print(request);
end;
retries_left := Retries;
while (retries_left > 0) and (zctx_interrupted = 0) do begin
msg := zmsg_dup(request); // Работаем с копией (вдруг оригинал пригодится?)
zmsg_send(msg, sctClient);
zPollItemInit(item, sctClient, 0, ZMQ_POLLIN, 0);
// При любом блокирующем вызове (libzmq), в случае ошибки будет -1;
// Теоретически, следует учесть разные коды завершения,
// но на практике достаточно учесть {EINTR} (Ctrl-C):
rc := zmq_poll(@item, 1, Timeout * ZMQ_POLL_MSEC);
if rc = -1 then
break; // Прерван
// Что-то приняли, обрабатываем
if (item.revents and ZMQ_POLLIN) <> 0 then begin
msg := zmsg_recv(sctClient);
if Verbose then begin
zclock_log('I: received reply:');
zmsg_print(msg);
end;
// В реальном коде обрабботка отказа должна быть более тщательной
assert(zmsg_size(msg) >= 3);
header := zmsg_pop(msg);
assert(zframe_streq(header, cMDPC_CLIENT));
zframe_destroy(header);
reply_service := zmsg_pop(msg);
assert(zframe_streq(reply_service, service));
zframe_destroy(&reply_service);
zmsg_destroy(request);
Result := msg; // Успех
Exit;
end
else if s_Dec(retries_left) > 0 then begin
if (Verbose) then
zclock_log('W: no reply, reconnecting...');
ConnectToBrocker;
end
else begin
if (Verbose) then
zclock_log('W: permanent error, abandoning');
break; // Всё
end;
end;
if zctx_interrupted <> 0 then
zclock_log('W: interrupt received, killing client...'#10);
zmsg_destroy(request);
Result := nil;
end;
end.
Код Рабочего в "объектном" стиле::
program mdworker;
{$APPTYPE CONSOLE}
uses
SysUtils,
zmq_h,
czmq_h,
mdwrkapi in 'mdwrkapi.pas',
mdp in 'mdp.pas';
procedure DoIt;
var
fVerbose: Boolean;
fReply: p_zmsg_t;
fRequest: p_zmsg_t;
fSession: TmdWorker;
begin
fVerbose := (ParamCount > 0) and (ParamStr(1) = '-v');
fSession := TmdWorker.Create('tcp://localhost:5555', 'echo', fVerbose);
fReply := nil;
while (true) do begin
fRequest := fSession.Recv(fReply);
if (fRequest = nil) then
break; // Рабочий был прерван
fReply := fRequest; // Эхо ... :-)
end;
fSession.Free;
Exit;
end;
begin
doIt;
Readln;
end.
Код API рабочего в "объектном" стиле::
unit mdwrkapi;
interface
uses
zmq_h
, czmq_h
;
type
TmdWorker = class(TObject)
Ctx: p_zctx_t; // Наш контекст
Broker: string;
Service: string;
sctWorker: Pointer; // Сокет для связи с брокером Socket to broker
Verbose: Boolean; // Протоколировать действия в stdout
// Управление хартбитингом
HeartbeatAt: uint64_t;
Liveness: size_t; // Сколько осталось попыток
Heartbeat: integer; // Задержка хартбитинга, в миллисекундах
Reconnect: Integer; // Задержка реконнекта в миллисекундах
ExpectReply: Boolean;
ReplyTo: p_zframe_t;
private
procedure ConnectToBroker;
procedure SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t);
public
constructor Create(const aBroker, aService: string; aVerbose: Boolean);
destructor Destroy; override;
function Recv(var reply: p_zmsg_t): p_zmsg_t;
end;
const
// Параметры надежности
cHEARTBEAT_LIVENESS = 3; // 3-5 достаточно
implementation
uses
ZMQ_Utils, mdp;
constructor TmdWorker.Create(const aBroker, aService: string; aVerbose: Boolean);
begin
assert(aBroker <> '');
assert(aService <> '');
Ctx := zctx_new();
Broker := aBroker;
Service := aService;
Verbose := aVerbose;
Heartbeat := 2500; // msecs
Reconnect := 2500; // msecs
ConnectToBroker();
end;
destructor TmdWorker.Destroy;
begin
inherited;
zctx_destroy(Ctx);
Broker := '';
Service := '';
end;
function TmdWorker.Recv(var reply: p_zmsg_t): p_zmsg_t;
// Метод сперва отсылает брокеру ответ, а затем ждет нового запроса.
// Название метода странное, угу.
// Отправляет ответ брокеру, если он есть и ждет следующего запроса
var
fCommand: p_zframe_t;
fEmpty: p_zframe_t;
fHeader: p_zframe_t;
fItem: zmq_pollitem_t;
fMsg: p_zmsg_t;
RC: Integer;
begin
// Формирование и отсылка ответа (reply), если он пустой
assert((reply <> nil) or (not ExpectReply));
if (reply <> nil) then begin
assert(ReplyTo <> nil);
zmsg_wrap(reply, ReplyTo);
SendToBroker(cMDPW_REPLY, nil, reply);
zmsg_destroy(reply);
end;
ExpectReply := true;
while (true) do begin
zPollItemInit(fItem, sctWorker, 0, ZMQ_POLLIN, 0);
RC := zmq_poll(@fItem, 1, Heartbeat * ZMQ_POLL_MSEC);
if RC = -1 then
break; // Прерван
if (fItem.revents and ZMQ_POLLIN) <> 0 then begin
fMsg := zmsg_recv(sctWorker);
if fMsg = nil then
break; // Прерван
if Verbose then begin
zclock_log('I: received message from broker:');
zmsg_print(fMsg);
end;
Liveness := cHEARTBEAT_LIVENESS;
// Ошибки не обрабатываем, просто шумим assert-от
assert(zmsg_size(fMsg) >= 3);
fEmpty := zmsg_pop(fMsg);
assert(zframe_streq(fEmpty, ''));
zframe_destroy(fEmpty);
fHeader := zmsg_pop(fMsg);
assert(zframe_streq(fHeader, cMDPW_WORKER));
zframe_destroy(fHeader);
fCommand := zmsg_pop(fMsg);
if (zframe_streq(fCommand, cMDPW_REQUEST)) then begin
// Вообще-то, мы должны извлечь и сохранить все адреса вплоть до
// пустого, но сейчас сохранем просто один...
ReplyTo := zmsg_unwrap(fMsg);
zframe_destroy(fCommand);
// В этом месте мы должны обрабатывать сообщение
// Сейчас просто возвращаем его в приложение
Result := fMsg; // Запрос на обработку
Exit;
end
else
if (zframe_streq(fCommand, cMDPW_HEARTBEAT)) then
// ; // В случае хартбита ничего не делаем
else
if (zframe_streq(fCommand, cMDPW_DISCONNECT)) then
ConnectToBroker()
else begin
zclock_log('E: invalid input message');
zmsg_print(fMsg);
end;
zframe_destroy(&fCommand);
zmsg_destroy(&fMsg);
end
else
if s_Dec(Liveness) = 0 then begin
if Verbose then
zclock_log('W: disconnected from broker - retrying...');
zclock_sleep(Reconnect);
ConnectToBroker();
end;
// Отправить хратбит, если пора
if zclock_time() > HeartbeatAt then begin
SendToBroker(cMDPW_HEARTBEAT, nil, nil);
HeartbeatAt := zclock_time() + Heartbeat;
end;
end;
if zctx_interrupted() <> 0 then
z_log('W: interrupt received, killing worker...'#10);
Result := nil;
end;
procedure TmdWorker.ConnectToBroker;
// Коннект или реконнект к брокеру
begin
if sctWorker <> nil then // Полный дисконнект, с разрушенеим сокета
zsocket_destroy(Ctx, sctWorker);
sctWorker := zsocket_new(Ctx, ZMQ_DEALER);
zmq_connect(sctWorker, PChar(Broker));
if Verbose then
zclock_log('I: connecting to broker at %s...', PChar(Broker));
// Регистрация сервиса в брокере
SendToBroker(cMDPW_READY, PChar(Service), nil);
// Если число попыток (liveness) обнулилось, считаем, что брокер отключен
Liveness := cHEARTBEAT_LIVENESS;
HeartbeatAt := zclock_time() + heartbeat;
end;
procedure TmdWorker.SendToBroker(aCommand, aOption: PChar; aMsg: p_zmsg_t);
// Отправка сообщения брокеру
// Если сообщения нет (пусто), создать его самостоятельно
begin
if aMsg <> nil then
aMsg := zmsg_dup(aMsg)
else
aMsg := zmsg_new();
// Формирование конверта сообщения в сответствии с протоколом
if aOption <> nil then
zmsg_pushstr(aMsg, aOption);
zmsg_pushstr(aMsg, aCommand);
zmsg_pushstr(aMsg, cMDPW_WORKER);
zmsg_pushstr(aMsg, '');
if Verbose then begin
zclock_log('I: sending %s to broker',
mdps_commands[Byte(aCommand^)]);
zmsg_print(aMsg);
end;
zmsg_send(aMsg, sctWorker);
end;
end.
zhash_free_fn = procedure(data: Pointer); cdecl;Чтобы описать такой метод в рамках класса, следует использовать модификаторы class и static
class procedure service_destroy(service: Pointer); static;
Честно говоря, я бы не стал заморачиваться с zhash_lookup(), а использовал бы вместо них, например, дельфийские TStringList с включенной сортировкой.



