пятница, 31 октября 2014 г.

03. ZeroMQ: "Параллельный трубопровод" (Parallel Pipeline). Задача для суперкомпьютеров.

Теперь попробуем разобраться с более удивительными вещами. (Начало здесь)



Рассмотрим топологию:


Процесс Ventilator - ставит задачи, которые будут решаться в параллельно выполняемых процессах Worker.
Процесс Worker - рабочий процесс, выполняющий поставленную задачу.
Процесс Sink - сборщик результатов от процессов Worker.

Считаем, что рабочие процессы типа Worker работают зверски быстро. Наверное, используя квантовые графические чипы с сигнальными процессорами, объединенными в нейронную сеть.

Практические задачи, которые в первую очередь приходят в голову:
- проверка натуральных чисел "на простоту";
- преобразование изображений к нужному формату;
- генерация биткоинов;
- ...



В нашем примере каждый процесс Worker просто будет "спать" в течении указанного ему времени. Картинка с другого сайта..
Причем, спать они начнут - по команде. После того как все построятся запустятся.

Кто здесь "сервер", кто "клиент"? Имхо, те, чьи сетевые адреса более-менее фиксированы. В данном случае - это процессы Ventilator и Sink. Хотя задачи выполнять будут все же процессы Work... клиенты... Хм.

Итак.
Ventilator сообщает системе ZeroMQ, что к нему можно цепляться по tcp к порту 5557, через который он будет раздавать задания. К этому порту будут цепляться процессы Worker, которые могут располагаться где угодно в сети. Всем, кто прицепился, будет отправлено задание, каждому - свое.
Sink сообщает системе, что он готов принимать сообщения по tcp на порт 5558. В этот порт процессы Worker будут посылать сообщения о завершении очередной задачи. Туда же будет посылать сигнал Ventilator о начале процесса пакетной обработки.

Поехали, создаем Ventilator.

program PL_Ventilator;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h;
const
  c_task_count = 100;
var
  fContext: Pointer;
  fSocketSender: Pointer;
  fSocketSink: Pointer;

  fTotal_Time: Cardinal;
  fWork_Load: Cardinal;
  i: Integer;
  fStrMsg: string;

begin
  // Инициализация
  fContext := zmq_ctx_new();
  // Сокет, рассылающий задания рабочим процессам
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_bind(fSocketSender, 'tcp://*:5557');

  // Сокет, отсылающий сборщику результатов сигнал о начале пакетной обработки
  fSocketSink := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSink, 'tcp://localhost:5558');

  Writeln('Press Enter when the workers are ready:');
  Readln(fStrMsg);

  Writeln('Sending tasks to workers...');

 // Первое сообщение - '0', просто сигнад начинать прием и обработку заданий
  fStrMsg := '0';
  zmq_send(fSocketSink, PChar(fStrMsg), Length(fStrMsg) * SizeOf(Char), 0);

  Randomize;

 // Send 100 tasks
  fTotal_Time := 0;
    // Суммарная оценка времени выполнения всех заданий в ms
  for i := 0 to Pred(c_task_count) do
  begin
    // Случайное значение fWork_Load от 1 to 100 ms
    fWork_Load := Random(100) + 1;
    fTotal_Time := fTotal_Time + fWork_Load;
      // Накопление значения суммарной общего времени
    fStrMsg := IntToStr(fWork_Load);
    zmq_send(fSocketSender, PChar(fStrMsg), Length(fStrMsg) + SizeOf(Char), 0);
  end;
  Writeln('Total expected time:', fTotal_Time, 'ms');
  sleep(1000); // Даем системе 0MQ время на доставку

  zmq_close(fSocketSink);
  zmq_close(fSocketSender);
  zmq_ctx_destroy(fContext);
  Readln;

end.

Пояснения.
В соответствии с заданием, создаем два сокета. fSocketSender - через него будут отправляться задания. fSocketSink - на него сборщику результатов будет отправлен сигнал о начале пакетной обработки.
После запуска "Ventilator"-а следует запустить некоторое число процессов "Worker" и один процесс - сборщик "Sink". Ну и нажать Enter в консоли процесса "Ventilator". :)
После этого сборщику ("Sink") отсылается сигнал о начале процесса пакетной обработки. И в цикле, 100 раз на сокет fSocketSender отправляются сообщения - задания ("спать ... мсек").
Сообщение генерируется как случайное значение от 1 до 100 и пересылается в виде строки (просто так). При этом, задания распределяются циклически, (round-robin), равномерно нагружая процессы "Worker".
Таким образом, процессов "Worker" должно быть не менее одного. Если процессов "Worker" доступных нет, рассылка блокируется (процесс "Ventilator" переходит в синхронный режим).

Далее, процесс "Worker":
 
program PL_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ_h;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSender: Pointer;
  fStrMsg: string;
  fLen: Integer;
  fMsg: zmq_msg_t;
begin
  fContext := zmq_ctx_new();

 // Сокет для приема сообщений
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Сокет для отправки сообщений
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

// Бесконечный цикл выполнения заданий
  while True do begin
    zmq_msg_init(@fMsg);
    fLen := zmq_msg_recv(@fMsg, fSocketReceiver, 0); // Получение задания

    SetLength(fStrMsg, fLen div SizeOf(Char));   // Перевод буфера сообщения в строку
    Move(zmq_msg_data(@fMsg)^, PChar(fStrMsg)^, fLen);

    Writeln(fStrMsg); // Отображение в консоли процесса "работы"

    Sleep(StrToInt(fStrMsg)); // "Полезная" работа...

    zmq_send(fSocketSender, nil, 0, 0); // Отправка сигнала сборщику результата
  end;
  zmq_close(fSocketReceiver);
  zmq_close(fSocketSender);
  zmq_ctx_destroy(fContext);
  Readln;

end.

Пояснения.

Процесс Worker подключается к двум известным tcp адресам - к процессу Ventilator:

  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');
... и к процессу "Sink":
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

Затем в бесконечном цикле получает сообщения от процесса Ventilator, копирует их в строку, выводит в консоль и выполняет задание ("спит"), по звершению которого отсылает сигнал сборщику результатов:

    zmq_msg_init(@fMsg);
    fLen := zmq_msg_recv(@fMsg, fSocketReceiver, 0); // Получение задания

    SetLength(fStrMsg, fLen div SizeOf(Char));   // Перевод буфера сообщения в строку
    Move(zmq_msg_data(@fMsg)^, PChar(fStrMsg)^, fLen);

    Writeln(fStrMsg); // Отображение в консоли процесса "работы"

    Sleep(StrToInt(fStrMsg)); // "Полезная" работа...

    zmq_send(fSocketSender, nil, 0, 0); // Отправка сигнала сборщику результата

Сборщику результатов ничего полезного не отправляют (сообщение длиной ноль), просто сигнал как факт.

Последнее: код сборщика сообщений Sink.

 
program PL_Sink;

{$APPTYPE CONSOLE}

uses
  SysUtils, Windows, ZMQ;

const
  c_task_count = 100;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fStart: Cardinal;
  fTaskCount: Integer;
  fDummy: string;
begin
 // Инициализация
  fContext := zmq_ctx_new();

  // Сокет для приема сигналов
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

  // Ожидание первого сигнала о старте пакета задач
  zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

  // Фиксация времени начала выполнения пакета
  fStart := GetTickCount();
  // Ждем подтверждения выполнения от 100 рабочих процессов
  for fTaskCount := 0 to Pred(c_task_count) do begin
    zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

    if (fTaskCount mod 10 = 0) then // "Статус - бар" :)
      Write(':')
    else
      Write('.')
  end;
// Вычисление и показ времени выполнения пакета задач
  Writeln('Total elapsed time: ', GetTickCount() - fStart, 'ms');

  zmq_close(fSocketReceiver);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);

end.

Тоже все просто. Сборщик слушает сокет:

  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

Первое сообщение он получает от процесса Ventilator, чтобы зафиксировать время начала пакетной обработки.
Далее сообщения поступают от исполнителей Worker. Содержание сообщений нас не интересует, поэтому длина ожидаемого сообщения указана равной нулю.

 После приема 100 сообщений на консоль выводится общее время выполнения:

  Writeln('Total elapsed time: ', GetTickCount() - fStart, 'ms');

Компилируем, проверяем.
Запускаем все три приложения, в консоли приложения Ventilator жмем Enter, наблюдаем процесс исполнения 100 задач.
Видим, что расчетное время выполнения примерно равно реальному:



Если же запустить не один, а три экземпляра приложения Worker, то станет видно, что общее время выполнения задания из 100 задач уменьшилась чуть ли не втрое:



Для 5 запущенных процессов Worker время уменьшится ещё больше:




Таким образом, библиотека ZeroMQ предоставляет простое средство в том числе и для создания системы распределенных вычислений.
Вы создаете систему, которую обслуживают, изначально одна вычислительная установки Worker. Если не хватает мощности - вы просто подключаете еще две, прямо без прерывания рабочего процесса, и обслуживает вас не одна, а уже три установки.

 В общем случае, установки могут находиться где угодно. Главное - чтобы они могли подключиться к процессам Ventilator и Sink по tcp.
То есть, мы получаем то, о чем любят говорить "энерпрайзники": масштабирование в ширину.

Когда мощность системы увеличивается простым наращиванием (конечно, до определенного уровня) параллельных элементов.

По поводу процедуры синхронизации.

Worker коннектятся с помощью метода zmq_connect(), который выполняется не мгновенно. Поэтому, если Ventilator начнет раздавать задачи сразу, то первые из подключившихся получат "под завязку", а последние так и не загрузятся.
Это связано в том числе и с тем, что сообщения встают в очередь как на стороне Ventilator, так и на стороне Worker. PUSH - сокет процесса Ventilator, равномерно распределяет задания по процессам Worker (если последние успели проконнектиться к Ventilator).

Это называется "балансировка нагрузки".  

PULL - сокет сборщика (процесса Sink) собирает результаты от процессов Worker равномерно. Процесс называется "справедливой очередью" (fair-queuing).

Соединение по схеме "Трубопровод" (Pipeline) так же страдает синдромом "запаздывания подключения". Это приводит к тому, что PUSH - сокеты не могут правильно балансировать нагрузку.
Если используются сокеты PUSH и PULL и какой-то из рабочих процессов (вроде рассмотренного выше Worker) получает больше сообщений, чем другие, то это потому, что PULL-сокет коннектится быстрее, чем другие сокеты и выгребает все сообщения еще до того, как законнектятся все остальные.

 В общем, разбираться надо. Тем более ,что в документации есть целый раздел, посвященный управлению балансировкой нагрузки. Дальше - немного теории. Совсем чуть-чуть  (продолжение).

1 комментарий :

  1. Огромное вам спасибо за проделанную работу. Тем более, что в сети мало что можно найти по сабжу на русском. Скажите, а VCL компоненты Вам не попадались, или может Вы сами что то делали. Было бы интересно ознакомиться. Еще раз спасибо, подписка на ваш блог оформлена )

    ОтветитьУдалить