понедельник, 7 декабря 2015 г.

23.3 ZeroMQ: Пересылка файлов. Модель №3 - клиент использует конвейерное управление потоком на основе кредитования сервера запросами.

Начало - здесь.

Сервер может отсылать по 10 кусков за раз, затем ждать однократного подтверждения.
Это, в общем, бессмысленно: все равно что умножать размер куска на 10.
Сервер может отправлять клиенту порции файла без перезапросов от клиента, периодически делая небольшие паузы, чтобы загрузить сеть настолько, насколько она может справиться. Для этого сервер должен знать, что происходит с сетью. Получение такой информации представляется непростой задачей. Кроме того, непонятно, что делать, когда сеть быстрая, а клиенты медленные. Кто будет заниматься буферизацией сообщений?
Сервер мог бы отслеживать состояние исходящей очереди и отправлять сообщения тогда, когда в очереди есть свободное место. Но ZeroMQ не позволяет таких вольностей. И сервер, и сеть могут оказаться достаточно быстрыми, а клиент - оказаться маленьким медленным устройством.
Можно, в конце концов, модифицировать libzmq так, чтобы изменить поведение при достижении границы HWM. Возможно, нужно блокировать новые сообщения. Тоже не очень здорово: один маленький медленный клиент заблокирует весь сервер.
Можно попробовать возвращать клиенту сообщение с ошибкой. Тогда усложнится сервер. Ох. Пока самое лучшее, что может сделать сервер - это отбросить сообщение.
В общем, все эти варианты либо усложняют логику, либо вообще легко приводят систему в нерабочее состояние
Все, что нам нужно - это дать возможность клиенту возможность сообщать серверу о своей готовности к работе. Если мы все сделаем правильно, данные будут поступать к клиенту непрерывным потоком, но лишь тогда, когда клиент будет готов их принять.



Вернемся к нашим моделям протоколов.

Модель 1.

Клиент: Запрос
Сервер: блок 1
Сервер: блок 2
Сервер: блок 3
...

Во второй версии протокола каждый блок запрашивался отдельно:

Модель 2.
Клиент: Запрос блока 1
Сервер: блок 1
Клиент: Запрос блока 2
Сервер: блок 2
Клиент: Запрос блока 3
Сервер: блок 3
....
Новый протокол должен исправить проблемы производительности:
Модель 3.
Клиент: Запрос блока 1
Клиент: Запрос блока 2 Клиент: Запрос блока 3
Сервер: блок 1
Клиент: Запрос блока 4
Сервер: блок 2
Сервер: блок 3 ....
Вроде бы протокол 3 похож на предыдущие. Однако, есть отличия: клиент шлет множество запросов, не дожидаясь ответов. Такая техника называется "конвейеризацией" (pipelining). Она работает, так как мы используем асинхронные сокеты DEALER/ROUTER.
И так, модель протокола № 3. Клиент посылает несколько запросов (запросы "в кредит"), затем, обрабатывая получаемые порции, он снова посылает запросы. В итоге, сервер никогда не отправит больше, чем запросил клиент.
Код:
program Model3_CreditBasedPipelining;
//  Пересылка файлов, модель #3
//  Клиент запрашивает по одному куску файла, используя
//  конвейерное управление потоком на основе кредитования сервера запросами.

{$APPTYPE CONSOLE}

uses
  SysUtils,
  Classes,
  zmq_h,
  czmq_h,
  ZMQ_Utils,
  Windows;

const
  cTestDataFileName = 'TestData.tst';
  cTCPAddress = 'tcp://%s:6000';
  cCHUNK_SIZE = 250000; // Размер куска
  cPIPELINE = 10; // Число порций в конвейере

procedure client_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
//  Клиентская нить. Каждая порция файла запрашивается отдельно.
//  В итоге исчезает перегрузка сервера, но падает скорость.
var
  chunk: p_zframe_t;
  chunks: size_t;
  credit: size_t;
  dealer: Pointer;
  fDummy: Integer;
  offset: size_t;
  size: size_t;
  total: size_t;
begin
  dealer := zsocket_new(ctx, ZMQ_DEALER);
  fDummy := zsocket_connect(dealer, PChar(Format(cTCPAddress, ['127.0.0.1']))); // Например

  total := 0; //  Всего байт принято
  chunks := 0; //  Всего кусков принято
  offset := 0; //  Смещение начала нового куска относительно начала
  credit := cPIPELINE; // Длина конвейера
  while (true) do begin
    while credit <> 0 do begin
  // Запрос следующего куска

  // Сообщение из 3х фреймов:
      zstr_sendm(dealer, 'fetch'); // Запрос на чтение
      zstr_sendfm(dealer, '%ld', offset); // Начало блока в файле
      zstr_sendf(dealer, '%ld', cCHUNK_SIZE); // Длина блока
      Inc(offset, cCHUNK_SIZE);
      Dec(credit);
    end;
    chunk := zframe_recv(dealer);
    if chunk = nil then
      break; //  Завершение
    Inc(chunks);
    Inc(credit);
    size := zframe_size(chunk);
    zframe_destroy(chunk);
    Inc(total, size);
    if size < cCHUNK_SIZE then
      break; //  Это был последний фрагмент, весь файл принят.

  end;
  Writeln(Format('%d chunks received, %d bytes', [chunks, total]));
  zstr_send(pipe, 'OK');
end;

// Дальше то же самое, что и в Модели 2, кроме установки HWM серверного сокета
// в cPIPELINE, чтобы убедиться в чистоте кода

procedure server_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
//  Серверная нить ждет запроса клиента, читает порцию файла,
//  и отправляет ее
var
  chunk: p_zframe_t;
  chunksz: size_t;
  chunksz_str: PChar;
  command: PChar;
  data: pByte;
  fDummy: Integer;
  fFS: TFileStream;
  identity: p_zframe_t;
  offset: size_t;
  offset_str: PChar;
  router: Pointer;
  size: size_t;
begin
  fFS := TFileStream.Create(cTestDataFileName, fmOpenRead);
  try
    router := zsocket_new(ctx, ZMQ_ROUTER);
    zsocket_set_hwm(router, cPIPELINE);
    fDummy := zsocket_bind(router, PChar(Format(cTCPAddress, ['*'])));
    while (true) do begin
      //  Первый фрейм каждого сообщени - идентификатор отправителя
      identity := zframe_recv(router);
      if identity = nil then
        break; //  Завершение
//        zframe_destroy(identity);

      //  Второй фрем - команда "fetch"
      command := zstr_recv(router);
      assert(command = 'fetch');
      zstr_free(command);

      //  Третий фрейм - смещение куска относительно начала файла
      offset_str := zstr_recv(router);
      offset := StrToInt(offset_str);
      zstr_free(offset_str);

      //  Четвертый фрейм - максимальная величина куска
      chunksz_str := zstr_recv(router);
      chunksz := StrToInt(chunksz_str);
      zstr_free(chunksz_str);

      //  Чтение порции данных из файла
      fFS.Position := offset;
      data := safe_malloc(chunksz);
      assert(data <> nil);

      //  Отправление результата клиенту
      size := fFS.Read(data^, chunksz);
      chunk := zframe_new(data, size);
      zframe_send(identity, router, c_ZFRAME_MORE);
      zframe_send(chunk, router, 0);

    end;
  finally
    fFS.Free;
  end;
end;

procedure DoIt;
//  Основная задача запускает нити клиента и сервера:
//  с одним процессом с нитями легче тестировать, чем с несколькими процессами.
var
  client: Pointer;
  ctx: p_zctx_t;
  fStr: PChar;
  fTime: Cardinal;
begin
  fTime := GetTickCount;
  //  Старт серверной и клиентской нитей
  ctx := zctx_new();
  zthread_fork(ctx, @server_thread, nil);
  client := zthread_fork(ctx, @client_thread, nil);
  //  Цикл ожидания работы клиента
  fStr := zstr_recv(client);
  zstr_free(fStr);
  //  Уничтожение серверной нити
  zctx_destroy(ctx);
  Writeln(GetTickCount - fTime, ' msec')
end;
begin
  doIt;
  readln;
end.

Время работы: примерно 4300 миллисекунд. Совсем другое дело! :)

Конечно, даже этот протокол №3 не годится для реального применения. Ибо:
- должно быть реализовано управление доступом и проверка подлинности (пусть даже без шифрования трафика) - для того, чтобы хотя бы отсекать доступ к важным данным. Например, при запуске тестов на рабочих серверах.
- запрос должен включать путь к файлу, уровень сжатия (опционально) и прочие важные вещи, вроде даты модификации файла. В качестве примера можно посмотреть на реализацию протокола HTTP.
- ответ должен содержать как минимум первый фрагмент, который содержит разные метаданные, вроде размера файла (клиент сможет зарезервировать место чтобы избежать ситуации "диск переполнен") и т.д.
- возможность получить набор файлов за один запрос в случае, когда файлы мелкие (иначе для мелких файлов протокол может стать неэффективным).
подтверждение от клиента, что файл принят полностью, чтобы повторно передать блоки, потерянные из-за обрывов связи.

Такие дела - (c).

~~~~~~~~~~~~~~~~~~~~~~~~~

Была рассмотрена семантика "загрузка" ("fetch"). Если клиент знает, что ему нужно, и что на сервере где лежит, можно вообще использовать другие механизмы: HTTP, FTP и т.п.
А ведь может быть и семантика "выгрузка" ("push"). Например, для централизованной системы, когда файлы сохраняются на главном "сервере". Или для системы обновления файлов типа "издатель-подписчик", когда клиенту нужны новые файлы определенного типа. Сервер, получив запрос на такую подписку, пересылает нужные  файлы клиенту.

Семантика "загрузка" - синхронная.  А семантика "выгрузка" - асинхронная. Асинхронный режим требует меньше обмена данными, он менее затратен и потому более быстрый. Кроме того, мы можем делать такие забавные вещи, как "подписка вот на этот путь", которые реализуют архитектуру передачи файлов типа "издатель-подписчик". Этот режим очевидно настолько удобен, и может очевидным образом решить столько практических задач, что даже нет смысла это обсуждать.

Тем не менее, с семантикой "загрузка" есть проблемы: требуется как-то сообщать клиентам, что за файлы есть на сервере. Или клиенты будут отдельно запрашивать список, или придется организовать дополнительный канал "издатель-подписчик", или потребуется явно взаимодействовать с пользователем.

Если еще немного подумать об этом, то мы, скорее увидим, что режим "загрузка" представляет собой особый случай режима "издатель - подписчик" с примерно такой схемой:

загрузить файлы по этому пути;
- вот кредитные запросы (повторить).


Чтобы все это заработало, нужно более - менее хорошо представлять, что за кредитные запросы мы будем посылать серверу. Наш трюк с конвейерными запросами на загрузку кусков не прокатит, так как клиент не знает, что там на сервере за файлы и какого они размера. Если клиент говорит "Принеси мне 250000 байт данных", то это должно работать одинаково хорошо как для 1 файла размером 250K, так и для 100 файлов размером по 2500 байт.
И так, общее направление понятно, требуется "лишь" обкатать детали.
...
Как бы то ни было, еще раз отметим, что кредитные запросы позволяют эффективно обойти проблему превышения HWM.


4 комментария :

  1. Привет. а какую версию zmq ты используешься?
    и работает ли оно у тебя под XP
    а то я в коде релиза zeromq-4.1.4
    нашел вызов - if_nametoindex он по MSDN поддерживается только под Vista и выше
    получается они как и nanmsg похоронили поддержку XP?

    ОтветитьУдалить
    Ответы
    1. Привет.
      Использую версию из репозитария разработчиков от 20 ноября 2014 года. В zmq.h она помечена как 4.2.0:
      #define ZMQ_VERSION_MAJOR 4
      #define ZMQ_VERSION_MINOR 2
      #define ZMQ_VERSION_PATCH 0
      ...
      Там if_nametoindex не используется.

      Да, грустная новость.

      Удалить
    2. Сия проблема появилась в 4.1.4, когда пофиксили "does not handle IPv6 link local addresses":
      * Fixed #1661 - does not handle IPv6 link local addresses.
      ...
      Можно попробовать в tcp_address.cpp подправить функцию:
      int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_, bool is_src_)
      - просто заменив ее на струю реализацию. Ну, и не использовать tcp адреса в формате Ipv6.

      Соответственно, аналогично подправить czmq (если используется).

      Удалить
    3. Слава Богу, вернули поддержку WinXP: в 4.1.5 (здесь: https://github.com/zeromq/zeromq4-1 ) убрали if_nametoindex, и в windows.hpp снова вернулась строка
      // Set target version to Windows Server 2003, Windows XP/SP1 or higher.
      вместо
      // Set target version to Windows Server 2008, Windows Vista or higher.

      В ветке 4.2 (здесь: https://github.com/zeromq/libzmq ):
      // Set target version to Windows Server 2008, Windows Vista or higher. Windows XP (0x0501) is also supported but without client & server socket types.

      Про сокеты типа client & server (ZMQ_SERVER/ZMQ_CLIENT) - здесь: http://api.zeromq.org/4-2:zmq-socket
      ~~~~~~~~

      Удалить