суббота, 1 ноября 2014 г.

07. ZeroMQ: обработка ошибок. ETERM.

(Начало здесь)

Обработка ошибок. ETERM.

Обработка ошибок ZeroMQ основана на двух положениях:
  1. Процессы уязвимы по отношению к внутреннем ошибкам.
  2. Внешние ошибки (и атаки) можно обработать (отразить атаки).

В качестве примера приводится функционирование живой клетки, которая самоуничтожается при внутреннем сбое и максимально долго борется с внешними угрозами.



Надежность кода должна обеспечиваться использованием Assert(). Срабатывание Assert() вызывает либо завершение приложения, либо исключительную ситуацию.
Когда ZeroMQ обнаруживает внешнюю проблему, она возвращает соответствующий код завершения. В редких случаях, ZMQ показывает сообщения "молча", если нет очевидной стратегии, позволяющей восстановиться после ошибки.

В рассмотренных ранее примерах обработки ошибок не было.
В реальном коде необходимо анализировать код завершения вызова каждого метода ZMQ.


Существует несколько простых правил, ноги которых растут еще с соглашений POSIX:
  • Методы, которые создают объекты, в случае ошибки возвращают nil;
  • Методы, которые обрабатывают данные, могут вернуть число обработанных байт, или -1 в случае ошибки;
  • Другие методы возвращают 0, когда все ОК, и, в случае ошибки - ненулевой код ошибке;
  • Код ошибки доступен через errno (для ОС POSIX) или через метод zmq_errno();
  • Описание ошибки (например, для логирования) можно получить с помощью метода zmq_strerror(). 

Пример:
  fContext := zmq_ctx_new(); // Инициализация
  Assert(fContext <> nil);

  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  Assert(fSocketFrontEnd <> nil);

  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  Assert(fSocketBackEnd <> nil);

  fRC := zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов
  Assert(fRC <> -1, 'Bind failed: tcp://*:5559');

  fRC := zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Конечная точка для сервисов
  Assert(fRC <> -1, 'Bind failed: tcp://*:5560');


 Есть две исключительные ситуации, которые могут обрабатываться как некритические:
  1. когда ваш код принимает сообщение с ZMQ_NOWAIT ("асинхронно"), но данных не ожидается. ZMQ вернет -1 и установит код ошибка равным EAGAIN;
  2. когда одна нить вызывает zmq_ctx_destroy(), а вторая все еще выполняет работу в блокирующем режиме, то вызов zmq_ctx_destroy() вызывает закрытие контекста и всех блокирующих вызовов с кодом завершения -1, а код ошибки устанавливается равным ETERM.

После того, как код будет отлажен и "вылизан", Asserts() могут быть отключены опциями компиляции. Однако, не стоит компилировать саму библиотеку ZMQ с отключенными assert() - запросто можно прозевать проблему в самом неожиданном месте.

Рассмотрим способы "чистого" завершения процессов. В качестве примера вспомним параллельный трубопровод из примера.

Мы возьмем пример параллельного трубопровода из предыдущего раздела. Если мы в фоне стартовали множество рабочих процессов Worker, то теперь мы хотим уничтожить их всех, когда все пакетное задание будет выполнено. Будем делать это, отправляя рабочим процессам сообщение "умри!". Этим будет заниматься процесс - сборщик Sink (так как он знает, когда завершается пакетное задание).

Каким же образом подключить сборщик Sink к рабочим процессам? Сокеты PUSH/PULL допускают передачу только в одну сторону. Можно использовать сокеты другого типа, или смешать несколько потоков. Попробуем следующее: используем схему "Издатель-Подписчик" (pub-sub) для отправки рабочим процессам сообщения "умри!".

Описание:
Сборщик (Sink) создает сокет - издатель (PUB) в новой конечной точке.
Рабочие процессы (Worker) связывают свои входные сокеты с этой конечной точкой.
Когда сборщик (Sink) определяет, что задание выполнено, он посылает сигнал "умри!" в свой сокет - издатель (PUB).
Когда рабочий процесс (Worker) обнаруживает сообщение "умри!", он завершается.

Топология:



Сборщик почти не меняется, добавится еще один сокет и отправка сообщения "умри!":

var
...
  fSocketController: Pointer;
...
begin
...
  // Сокет для отправки сигнала "умри!"
  fSocketController := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketController, 'tcp://*:5559');
...
  // Отправка сигнала "умри!" рабочим процессам
  zmq_send(fSocketController, PChar('KILL')^, 4, 0);
...


Полный код сборщика:
program PL_Sink;

{$APPTYPE CONSOLE}

uses
  SysUtils, Windows, ZMQ_h;

const
  c_task_count = 100;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketController: Pointer;
  fStart: Cardinal;
  fTaskCount: Integer;
begin
 // Инициализация
  fContext := zmq_ctx_new();

  // Сокет для приема сигналов
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

  // Сокет для отправки сигнала "умри!"
  fSocketController := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketController, 'tcp://*:5559');

  // Ожидание первого сигнала о старте пакета задач
  zmq_recv(fSocketReceiver, nil, 0, 0);

  // Фиксация времени начала выполнения пакета
  fStart := GetTickCount();
  // Ждем подтверждения выполнения от 100 рабочих процессов
  for fTaskCount := 0 to Pred(c_task_count) do begin
    zmq_recv(fSocketReceiver, nil, 0, 0);

    if (fTaskCount mod 10 = 0) then // "Статус - бар" :)
      Write(':')
    else
      Write('.')
  end;
  // Вычисление и показ времени выполнения пакета задач
  Writeln('Total elapsed time: ', GetTickCount() - fStart, 'ms');

  // Отправка сигнала "умри!" рабочим процессам
  zmq_send(fSocketController, PChar('KILL'), 4, 0);

  zmq_close(fSocketReceiver);
  zmq_close(fSocketController);
  zmq_ctx_destroy(fContext);
  Readln;

end.



Естественно, рабочий процесс "Worker" тоже придется переделать. Теперь "Worker" управляется двумя сокетами: PULL - для получения задачи, и SUB - для получения команд управления. И не забываем, что SUB сокет должен быть настроен:
  zmq_setsockopt(fSocketController, ZMQ_SUBSCRIBE, nil, 0);


Используем уже знакомую технику поллинга с zmq_poll (), которую уже применяли ранее.
Код рабочего процесса:
  

program PL_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSender: Pointer;
  fSocketController: Pointer;
  fStrMsg: string;
  fLen: Integer;
  fMsg: zmq_msg_t;
  fPollItems: array[0..1] of zmq_pollitem_t;
begin
  fContext := zmq_ctx_new();

 // Сокет для приема сообщений
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Сокет для отправки сообщений
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

// Сокет для приема управляющх сигналов
  fSocketController := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketController, 'tcp://localhost:5559');
  zmq_setsockopt(fSocketController, ZMQ_SUBSCRIBE, nil, 0);

  fPollItems[0].socket := fSocketReceiver;
  fPollItems[0].fd := 0;
  fPollItems[0].events := ZMQ_POLLIN;
  fPollItems[0].revents := 0;
  fPollItems[1].socket := fSocketController;
  fPollItems[1].fd := 0;
  fPollItems[1].events := ZMQ_POLLIN;
  fPollItems[1].revents := 0;

// Бесконечный цикл выполнения заданий
  while True do begin
    zmq_poll(@fPollItems[0], 2, -1);
    if (fPollItems[0].revents and ZMQ_POLLIN) <> 0 then begin
      zmq_msg_init(@fMsg);
      fLen := zmq_msg_recv(@fMsg, fSocketReceiver, 0); // Получение задания
      SetLength(fStrMsg, fLen div SizeOf(Char)); // Перевод буфера сообщения в строку
      Move(zmq_msg_data(@fMsg)^, PChar(fStrMsg)^, fLen);
      Writeln(fStrMsg); // Отображение в консоли процесса "работы"
      Sleep(StrToInt(fStrMsg)); // "Полезная" работа...
      zmq_send(fSocketSender, nil, 0, 0);   // Отправка сигнала сборщику результата
    end;
   // Все, что получаем из контроллера, считаем командой 'KILL'
    if (fPollItems[1].revents and ZMQ_POLLIN) <> 0 then
      Break; // Выход из цикла обработки
  end;
  zmq_close(fSocketReceiver);
  zmq_close(fSocketSender);
  zmq_close(fSocketController);
  zmq_ctx_destroy(fContext);

end.


Теперь запускаем: процесс "Sink", один или несколько процессов "Worker", процесс "Ventilator". В консоли процесса "Ventilator" жмем Enter и наблюдаем примерно такую картину:
...за исключением того, что процессы "Worker" по завершению пакетного задания будут завершены: по завершению работы процесс "Sink" посылает сигнал "умри!" всем подписчикам (процессам "Worker").

 Обработка Crtl+C для консольных приложений Windows.



Казалось бы, нажали и хрясь - приложение убито. Если бы.

Как было сказано выше - можно получить зависон, особенно если приложение многонитевое.
И надо бы пройтись по всем нитям и "сообщить" им о своих намерениях, чтобы те завершили работу с сокетами

В общем, "все не так однозначно".

Все, что написано в документации по ZMQ - касается всяческих Linux.

Нам, дельфистам, придется использовать специальные обработчики: подключаем в uses модуль Windows и настраиваем:

  


uses
  ...Windows, ...;
...
begin
...
  Windows.SetConsoleCtrlHandler(CtrlCHandler, True);


Шаблон для CtrlCHandler:
  

function console_handler( dwCtrlType: DWORD ): BOOL;
var
  i: Integer;
begin
  if CTRL_C_EVENT = dwCtrlType then
  begin
    // Выполняем "мягкое" завершение
...
  end else 
    result := False;
end;


А уж как выполнить "мягкое" завершение - зависит от текущей задачи.
...
...
Поговорим о мультитрейдинге. (Продолжение)


Комментариев нет :

Отправить комментарий