воскресенье, 2 ноября 2014 г.

08. ZeroMQ. Мультитрейдинг. Многонитевые приложения.

(Начало здесь).



ZeroMQ предоставляет простой способ создания многонитевых приложений.
При этом не потребуются ни мьютексы, ни блокировки, ни прочие дела для организации межнитевого взаимодействия, кроме сообщений, отправляемых через сокеты ZMQ.

Есть одно железное правило для успешного многонитевого кодинга - "не разделять между нитми изменяющиеся данные". Обычно, когда две нити в приложении пытаются между собой разделять данные, то это выглядит как будто два алкаша пытаются разделить пиво. И чем больше алкашей собирается за столом, тем хуже ситуация.
Большинство многонитевых приложений похоже на сборище пьяных алоголиков, учинивших драку в баре.

Microsoft даже опубликовала статью "Решение 11 вероятных проблем в вашем многонитевом коде", в которой упомянуты всяческие ужастики вроде забытой синхронизации, незаблокированной модификации общих данных, и проч.

Для беспроблемного написания многонитевого кода с помощью ZeroMQ следует руководствоваться следующими правилами:


  • Изолируйте данные внутри нити и никогда не разделяйте их между нитями. Единственным исключением являются контексты ZeroMQ, которые являются threadsafe.
  • Держитесь подальше от классических механизмов параллелизма, таких как мьютексы, критические секции, семафоры и т.д. Это анти-паттерны в приложениях ZeroMQ.
  • Создайте один ZeroMQ контекст в начале вашего процесса и передавайте его всем создаваемым нитям, с которыми будете взаимодействовать через InProc сокеты ZMQ.
  • Для создания структуры вашего приложения используйте присоединяемые (attach) нити, и соедините их с их родительскими нитями через сокеты PAIR по протоколу InProc. Порядок работы: привязываем (zmq_bind()) сокет, а затем создаем дочернюю нить, которая коннектится к сокету родительской нити.
  • Используйте отсоединенные (detach) нити для имитации работы самостоятельных задач, с учетом своих условий. Подключите их по протоколу TCP. Позже вы можете переместить их в автономные процессы без значительного изменения кода.
  • Все взаимодействия между нитями происходит как обмене ZeroMQ сообщения, которые вы можете определить более или менее формально.
  • Не разделять сокеты ZeroMQ между нитями. Сокеты ZeroMQ не threadsafe . Технически есть возможность передачи сокета от одной нити к другой, но это требует навыка. Единственное место, где разумно и оправдано разделение сокетов между нитями - это удаление сокетов в деструкторах ваших классовых оберток над ZMQ.
  • При создании многонитевого Delphi приложения обязательно задавать значение True для переменной System.IsMultyThread. Но этого делать не нужно, если прямо или косвенно вызывается процедура BeginThread, в которой также задается это значение.

 Например:
    Пример.

    Предположим, архитектура вашего приложения требует, чтобы в приложении было более одного прокси, и вы хотите, чтобы каждый из них выполнятся в своей нити. Очень легко допустить ошибку, создав фронтэнд и бэкэнд сокеты таких прокси в одной нити, а затем передав сокеты в прокси (который живет в другой нити). Возможно, на первый раз все заработает, но глюки - неминуемы, причем в совершенно произвольные моменты времени.

    Запоминаем: не используем (и не закрываем) сокеты нигде, кроме как нитях, их создавших.

    Если следовать перечисленным правилам, можно легко создавать надежные многонитевые приложения. Логика приложения может размещаться в нитях, процессах, или узлах сети, в соответствии с текущими планами по захвату мира.

    ZeroMQ использует нативные нити ОС (Windows), а не виртуальные "зеленые" нити. Для отладки можно использовать стандартные средства, вроде ThreadChecker от Intel, чтобы увидеть, что ваше приложение делает. К недостаткам использования нативных нитей можно отнести, что API-интерфейсы "родного" мультитрейдинга конкретной ОС не всегда портируются, и, к примеру, если вы используете огромное количество (тысячи) нитей, то некоторые ОС просто не потянут такой нагрузки.
    ...
    Перейдем к практике. Превратим наш старый сервер, в нечто более работоспособное.
    Старый сервер был однонитевым. Если обслуживание каждого запроса было легким, то это нормально: одна ZMQ нить может работать на полной скорости ядра процессора без ожидания и выполнять очень много работы.
    Но серверы из реальной жизни на каждый запрос делают более сложную работу. Одноядерного сервера может оказаться недостаточно, когда по серверу жахнет сразу 10000 клиентов. Сервер из реальной жизни будет создавать несколько рабочих нитей. После чего он будет принимать запросы так быстро, как это возможно и раздавать их своим своих рабочим нитям.
    Рабочие нити будут "перемалывать цифирь" и отправлять результаты обратно.

    Конечно, это можно сделать с помощью прокси-брокера и внешних рабочих процессов, но часто легче начать один процесс, который займет все шестнадцать ядер, чем шестнадцать процессов, каждый из которых жрет одно ядро. Кроме того, рабочие процессы будут занимать линии связи и поглощать сетевой трафик и просто тормозить.

    Топология системы:



    Код многонитевого сервиса (сервера):

      
    
    program RR_Service;
    
    {$APPTYPE CONSOLE}
    // Многонитевой сервис (сервер)
    // Находится в известной клиентам конечной точке, "слушает" tcp порт 5555,
    // к которому привязан сокет ZMQ_ROUTER
    // Получает целое число, возводит в квадрат и отправляет обратно результат
    
    uses
      SysUtils, ZMQ_h;
    
    procedure Worker_Routine(aContext: Pointer); // Процедура нити
    var
      fSocketReceiver: Pointer;
      fSrcVal: Cardinal;
      fResultVal: UInt64;
      i: integer;
    begin
    // Сокет для общения с диспетчером
      fSocketReceiver := zmq_socket(aContext, ZMQ_REP);
      zmq_connect(fSocketReceiver, 'inproc://worker');
      i := 0;
      while True do
      begin
        zmq_recv(fSocketReceiver, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос
        fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа"
        zmq_send(fSocketReceiver, @fResultVal, SizeOf(fResultVal), 0); // Ответ
        Writeln('In thread, Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
        Inc(i);
      end;
      zmq_close(fSocketReceiver);
    end;
    
    var
      fContext: Pointer;
      fSocketClients: Pointer;
      fSocketWorkers: Pointer;
      i: integer;
      fThreadId: Cardinal;
    begin
      fContext := zmq_ctx_new(); // Инициализация
      fSocketClients := zmq_socket(fContext, ZMQ_ROUTER); // Создание сокетов
      fSocketWorkers := zmq_socket(fContext, ZMQ_DEALER);
      zmq_bind(fSocketClients, 'tcp://*:5555');
        // Привязка к конечным точкам
      zmq_bind(fSocketWorkers, 'inproc://worker');
    
      Writeln('Starting service...');
      for i := 0 to 2 do // Запуск пула рабочих нитей
        BeginThread(nil, 0, @Worker_Routine, fContext, 0, fThreadId);
      // Соединение рабочих нитей с нитями клиентов через очередь:
      zmq_proxy(fSocketClients, fSocketWorkers, nil);
    
      zmq_close(fSocketClients);
      zmq_close(fSocketWorkers);
      zmq_ctx_destroy(fContext);
    
      Readln;
    
    end.
    
    
    

    Клиент не изменится:
      
    
    
    program RR_Client;
    
    {$APPTYPE CONSOLE}
    // Клиент
    // Коннектится сокетом REQ к tcp://localhost:5555
    // Шлет целое число сервису (серверу), обратно получает квадрат числа
    
    uses
      SysUtils, ZMQ_h;
    const
      c_ReqCnt = 100;
    var
      fContext: Pointer;
      fSocketRequester: Pointer;
      fSrcVal: Cardinal;
      fResultVal: UInt64;
      i: integer;
    begin
      fContext := zmq_ctx_new(); // Инициализация
      fSocketRequester := zmq_socket(fContext, ZMQ_REQ);
      zmq_connect(fSocketRequester, 'tcp://localhost:5555');
        // Коннект к сервису
      Randomize();
      for i := 0 to Pred(c_ReqCnt) do begin
        fSrcVal := Cardinal(Random(-1));
          // Генерация случайного целого
        zmq_send(fSocketRequester, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос
        Write('Iter: ', i, ' src=', fSrcVal);
        zmq_recv(fSocketRequester, @fResultVal, SizeOf(fResultVal), 0); // Ответ
        Writeln(' result=', fResultVal);
      end;
      zmq_close(fSocketRequester);
      zmq_ctx_destroy(fContext);
      Readln;
    
    end.
    
    

    Прежде всего, интересен сервис ("сервер"). Основная нить приложения поочередно транслирует запросы от сокета fSocketClients (сокет типа ZMQ_ROUTER) к сокету fSocketWorkers (сокет типа ZMQ_DEALER) и обратно, используя знакомый метод zmq_proxy().
    То есть, основная нить является брокером ("прокси") сообщений.
    Для обработки данных запущено три нити. Процедура нити - Worker_Routine(). В процедуре нити создается сокет fSocketReceiver типа (ZMQ_REP). Сокет fSocketReceiver рабочей нити связан с сокетом fSocketWorkers основной нити по inproc протоколу.

    Сокет рабочей нити:
      
    
    // Сокет для общения с диспетчером
      fSocketReceiver := zmq_socket(aContext, ZMQ_REP);
      zmq_connect(fSocketReceiver, 'inproc://worker');
    
    

    Сокет основной нити:
      
    
      fSocketWorkers := zmq_socket(fContext, ZMQ_DEALER);
      fRez := zmq_bind(fSocketWorkers, 'inproc://worker');// Привязка к конечной точке
    
    
    Естественно, вместо inproc - протокола можно было бы использовать tcp, но пришлось бы занять порт и, кроме того, inproc гораздо быстрее.

    Еще раз: работа сервиса.


    Сервис запускает несколько рабочих нитей. Каждая рабочая нить создает сокет типа REP и затем в цикле обрабатывает запросы к сокету.
    Рабочие нити логически представляют собой обычные однонитевые сервисы, разница - в транспорте (inproc вместо tcp), и в направлении операции "привязать - подключить" (bind-connect).

    Сервер создает сокет типа ROUTER, чтобы общаться с клиентами и связывает его (сокет) с внешним интерфейсом по tcp.

    Сервис создает сокет типа DEALER, чтобы общаться с рабочими процессами и связывает этот сокет с внутренним интерфейсом (с помощью inproc).

    Сервер зарускает прокси, который соединяет оба сокета. Прокси получает входящие запросы от всех клиентов и распределяет эти запросы между рабочими нитями. При этом он правильно выполняет маршрутизацию ответов так, чтобы они вернулись туда, откуда приходили запросы.

    То есть, цепочка запрос - ответ выглядит так: REQ-ROUTER-очередь-DEALER-REP.

    В выводе сервиса видно, что номера итераций повторяются по три раза - по числу запущенных нитей:


     Обмен сигналами между нитями.


    Пора рассмотреть сокеты типа PAIR.

    Пример:

    Приложение в "основной нити" создает нить 2 и ждет сигнала о выполнении какой-то полезной работы, потом выполняет Step 3.
    Нить 2 создает нить 3 и ждет сигнала о выполнении какой-то полезной работы, затем посылает сигнал "основной" нити, затем выполняет Step 2.
    Нить 3 выполняет некоторую полезную работу (Step 1) и посылает сигнал нити 2.

    Отправка - прием сигналов будет выполняться с помощью сокетов типа PAIR по inproc протоколу.

    И так, исходник многонитевого приложения:
      
    
    program SgnlBtwThrds;
    
    {$APPTYPE CONSOLE}
    
    uses
      SysUtils
      , ZMQ_h
      , ZMQ_Utils // Тут вспомогательные вещи, вроде s_send() и s_recv()
      , Windows;
    
    
    function Thread_Step_1(aContext: Pointer): integer;
    var
      fSocketXmitter: Pointer;
    begin
      Result := 0;
     // Подключается к Thread_Step_2 и сообщает о готовности
      fSocketXmitter := zmq_socket(aContext, ZMQ_PAIR);
      zmq_connect(fSocketXmitter, 'inproc://step2');
      Writeln('Step 1 ready, signaling step 2');
      s_send(fSocketXmitter, 'READY');
      zmq_close(fSocketXmitter);
    end;
    
    
    function Thread_Step_2(aContext: Pointer): integer;
    var
      fDummy: string;
      fSocketReceiver: Pointer;
      fSocketXmitter: Pointer;
      fThreadId: Cardinal;
    begin
      Result := 0;
     // Связыывает inproc сокет перед запуском
      fSocketReceiver := zmq_socket(aContext, ZMQ_PAIR);
      zmq_bind(fSocketReceiver, 'inproc://step2');
      BeginThread(nil, 0, @Thread_Step_1, aContext, 0, fThreadId);
    
    // Ожидание сигнала
      fDummy := s_recv(fSocketReceiver);
      zmq_close(fSocketReceiver);
    
    // Коннект к step3 сообщение о готовности
      fSocketXmitter := zmq_socket(aContext, ZMQ_PAIR);
      zmq_connect(fSocketXmitter, 'inproc://step3');
      Writeln('Step 2 ready, signaling step 3');
      s_send(fSocketXmitter, 'READY');
      zmq_close(fSocketXmitter);
    end;
    
    procedure Main_Step_3;
    var
      fContext: Pointer;
      fSocketReceiver: Pointer;
      fThreadId: Cardinal;
      fDummy: string;
    begin
      fContext := zmq_ctx_new();
    
      fSocketReceiver := zmq_socket(fContext, ZMQ_PAIR);  // Сокет для приема сигнала
      zmq_bind(fSocketReceiver, 'inproc://step3');
      BeginThread(nil, 0, @Thread_Step_2, fContext, 0, fThreadId);
    
      fDummy := s_recv(fSocketReceiver);
      zmq_close(fSocketReceiver);
    
      Writeln('Step 3 ready!');
      Writeln;
      Writeln('Test successful!');
      zmq_ctx_destroy(fContext);
    end;
    
    begin
      Main_Step_3;
      Readln;
    
    end. 
     
    Запускаем, наблюдаем: 
    
    

    Неземная красота... :)

    И так, выше был представлен образец классического многонитевого приложения ZeroMQ:

    • Две нити взаимодействуют через InProc, используя общий контекст.
    • Родительская нить создает один сокет, связывает его с конкретной конечной точкой по InProc, а затем запускает дочернюю нить, передавая ей контекст.
    • Дочерняя нить создает второй сокет, соединяет его с той же конкретной конечной точкой по InProc и по готовности сигнализирует родительской нити.


    Обращаем внимание, что многонитевой код, используемый в данной схеме, не масштабируется за пределы процесса: если используется протокол InProc и сокеты типа PAIR, значит, строится сильносвязная система, в которой есть взаимозависымые структуры. Такие вещи следует делать, когда нужна высокая скорость взаимодействия между компонентами системы.
    Если использовать схему с протоколом TCP и использовать собственный контекст в каждой нити, система будет менее связной и позволит в будущем легко масштабироваться методом вычленения узлов в отдельные процессы.
    ~~~~~~~~~~~~~

    Почему были использованы сокеты типа PAIR? Потому, что использование сокетов других типов имеет нежелательные побочные эффекты:
    • Можно использовать рассмотренные ранее PUSH для отправителя и PULL для приемника. Это выглядит просто и будет работать, однако следует помнить, что PUSH будет распределять сообщения по всем доступным приемникам. Если вы случайно запустили два приемника (например, создали еще одна нить с такой же процедурой нити), то вы сразу "потеряете" половину ваших сигналов. Преимущество сокетов PAIR в том, что они не позволят создать больше одного соединения; пара же сокетов типа PAIR - является эксклюзивной.
    • Вы можете использовать DEALER для отправителя и ROUTER для приемника. ROUTER, однако, упаковывает сообщение в "конверт", т.обр. ваш сигнал нулевого размера превращается в составное сообщение. Это несущественно, если вы не заботитесь о самих данных, а посылаете только сигнал. Однако, если понадобится отправить реальные данные, то обнаружится, что ROUTER прислал вам "неправильные" сообщения. Кроме того, DEALER точно так же как и PUSH, распределяет исходящие сообщения между всеми приемниками , т.е. тут такой же риск потери сообщений, как и при использовании PUSH.
    • Вы можете использовать PUB для отправителя и SUB для приемника. Эта схема будет правильно доставлять сообщения, и PUB не разбросает их по приемникам, как DEALER или PUSH. Тем неменее, вам придется все время настривать приемник на подписку, что утомительно.
    По этим причинам, пара сокетов типа PAIR - лучший выбор для пересылки сигналов координации между парами нитей в приложении.

    Что еще? Вот что. Появились новые методы: s_send() и s_recv(), которые определены в модуле ZMQ_Utils.pas. Мне просто надоела возня с упаковкой и распаковкой строк в сообщения ZMQ и обратно, и я добавил пару вспомогательных методов:

      
    
    unit ZMQ_Utils;
    
    interface
    uses ZMQ_h;
    
    // Возвращает длину отправленного сообщения в байтах
    function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;
    
    // Отправляет строку Delphi в сокет. Возвращает число отправленных байт.
    function s_send(aZMQSocket: Pointer; const aSrcString: string;
      aFlags: integer = 0): integer;
    ...
    implementation
    uses SysUtils, Windows, Math;
    ...
    
    function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;
    //  Читает строку ZMQ из сокета и преобразует её в строку Delphi
    //  В сокете должна быть именно строка Delphi. Возвращает пустую строку
    //  если контекст ZMQ был завершен.
    var
      fLen: Integer;
      fZMQMsg: zmq_msg_t;
    begin
    
      Result := '';
      try
        zmq_msg_init(@fZMQMsg);
        fLen := zmq_msg_recv(@fZMQMsg, aZMQSocket, aFlags);
        if fLen <= 0 then
          Exit;
        SetLength(Result, fLen div SizeOf(Char));
        Move(zmq_msg_data(@fZMQMsg)^, PChar(Result)^, fLen div SizeOf(Char));
      finally
        zmq_msg_close(@fZMQMsg);
      end;
    end;
    
    function s_send(aZMQSocket: Pointer; const aSrcString: string;
      aFlags: integer = 0): integer;
        // Возвращает длину отправленного сообщения в байтах
    var
      fZMQMsg: zmq_msg_t;
    begin
      zmq_msg_init(@fZMQMsg);
      if Length(aSrcString) > 0 then begin
        zmq_msg_init_size(@fZMQMsg, Length(aSrcString) * SizeOf(Char));
        Move(PChar(aSrcString)^, zmq_msg_data(@fZMQMsg)^, Length(aSrcString) *
          SizeOf(Char));
      end;
      Result := zmq_msg_send(@fZMQMsg, aZMQSocket, aFlags);
    end;
    ...
    
    
    
    В ZMQ_Utils.pas добавлены еще разные разности, о которых я расскажу позднее.

    Теперь поговорим о согласовании работы разных элементов сети. (Продолжение)

    Комментариев нет :

    Отправить комментарий