пятница, 4 декабря 2015 г.

23.1 ZeroMQ: Пересылка файлов. Модель №1 - отправляем сразу весь файл большими порциями.


Задача: переслать файл.


ZMQ "искаропки" отлично справляется с пересылкой сообщений и оповещениями, но вот с пересылкой файлов все не так очевидно. Можно, конечно, тупо попробовать засунуть файл в сообщение целиком.
Однако, есть возражения:
1. Оперативная память, к сожалению, не резиновая.
2. Ладно, отправили по сети файл-сообщение. А сеть оказалась медленной (какой-нибудь древний вай-фай), да еще и неустойчивой. ZMQ, как мы помним, пытается обеспечить доставку сообщения даже в случае сбоев. Итак, несколько попыток пересылки по 1 гигабайту - здорово?
3. А если этот гигантский файл хотят скачать несколько клиентов одновременно? Каждому клиенту выделить буфер нужного размера, отправить и ждать? Свободная память закончится гораздо быстрее. То есть, "правильный" протокол передачи файлов должен учитывать ограниченность размеров ОЗУ.
...
...начинаем строить правильный протокол передачи файлов.



 Подготовка.

 Для тестирования протокола создадим большой файл, состоящий из квазислучайной последовательности байт. Это нужно, чтобы при тестировании избежать возможного эффекта сжатия, данных, который может появиться из-за различных компонентов сети.

program TD;

{$APPTYPE CONSOLE}

uses
  SysUtils, Classes;

const
  cBlockSize = 1024;
  cFileSize = 1024 * 1024 * 1024;
  cTestDataFileName = 'TestData.tst';

procedure DoIt;
var
  fFS: TFileStream;
  i: Integer;
  j: Integer;
  fBuff: array of Integer;
begin
  SetLength(fBuff, cBlockSize);
  Randomize;
  fFS := TFileStream.Create(cTestDataFileName, fmOpenWrite);
  try
    for i := 0 to Pred(cFileSize div (Length(fBuff) * SizeOf(Integer))) do begin
      for j := Low(fBuff) to High(fBuff) do
        fBuff[j] := Random(-1);
      fFS.Write(fBuff[0], Length(fBuff) * SizeOf(Integer))
    end;
  finally
    fFS.Free
  end;
end;
begin
  doIt;
  readln;
end.


Компилируем, запускаем. Появился файл размером 1 Гб.
Проверим, насколько файл "несжимаем": сожмем его с помощью 7z:


Очень хорошо: после обработки 7z файл ничуть не сжался, даже наоборот.

Размер 1Гб достаточен, чтобы почувствовать проблемы, когда такой файл пытаются загрузить несколько клиентов.



Для начала измерим время, требуемое для последовательного чтения файла с диска. Все, что больше - будет временем, которое съест наш протокол пересылки файлов (включая сетевые задержки).


program ReadDataFromHDD_Time;

{$APPTYPE CONSOLE}

uses
  SysUtils, Classes, Windows;

const
  cBlockSize = 1024;
  cTestDataFileName = 'TestData.tst';

procedure DoIt;
var
  fTicks: Cardinal;
  fIFS: TFileStream;
  fBuff: array of Integer;
  fCnt: Integer;
begin
  SetLength(fBuff, cBlockSize);
  fIFS := TFileStream.Create(cTestDataFileName, fmOpenRead);
  try
    fTicks := GetTickCount();
    while fIFS.Read(fBuff[0], Length(fBuff) * SizeOf(Integer)) > 0 do ;
    fTicks := GetTickCount() - fTicks;
    Writeln('Time(msec):', fTicks);
  finally
    fIFS.Free;
  end;


end;
begin
  doIt;
  readln;
end.

Запустим программу несколько раз. У меня получилось в среднем 375 миллисекунд (плюс-минус 25 миллисекунд).  Запомним его

Начнем кодировать непосредственно пересылку файла.

Вариант 1. Клиент запрашивает файл, а сервер кусками без остановки пересылает его. 



Для тестирования снова создадим многонитевое приложение:
  
program Model1_EntireFileByLargeChunks;
//  Пересылка файлов, модель #1
//  Сервер отсылает весь файл клиенту
//  большими кусками без попыток управления потоком

{$APPTYPE CONSOLE}

uses
  FastMM4,
  SysUtils,
  Classes,
  zmq_h,
  czmq_h,
  ZMQ_Utils,
  Windows;

const
  cTestDataFileName = 'TestData.tst';
  cTCPAddress = 'tcp://%s:6000';
  cCHUNK_SIZE = 250000; // Размер куска

procedure client_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
var
  chunks: size_t;
  dealer: Pointer;
  fDummy: Integer;
  frame: p_zframe_t;
  size: size_t;
  total: size_t;
begin
  dealer := zsocket_new(ctx, ZMQ_DEALER);
  fDummy := zsocket_connect(dealer, PChar(Format(cTCPAddress, ['127.0.0.1']))); // Например
  zstr_send(dealer, 'fetch');
  total := 0; //  Все байт принято
  chunks := 0; //  Всего кусков принято

  while (true) do begin
    frame := zframe_recv(dealer);
    if frame = nil then
      break; //  Завершение
    Inc(chunks);
    size := zframe_size(frame);
    zframe_destroy(frame);
    Inc(total, size);
    if size = 0 then
      break; //  Весь файл принят
  end;
  Writeln(Format('%d chunks received, %d bytes', [chunks, total]));
  zstr_send(pipe, 'OK');
end;

procedure server_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
//  Нить сервера кусками читает файл с диска и отправляет
//  каждый кусок клиенту в отдельном сообщении. Тестируем работу с одним файлом
var
  chunk: p_zframe_t;
  command: PChar;
  data: pByte;
  fDummy: Integer;
  fFS: TFileStream;
  identity: p_zframe_t;
  router: Pointer;
  size: size_t;
begin
  fFS := TFileStream.Create(cTestDataFileName, fmOpenRead);
  try
    router := zsocket_new(ctx, ZMQ_ROUTER);
    //  HWM по умолчанию равно 1000, что может привести к потере сообщений в случае,
    //  если будет отправлено более 1000 порций данных.
    //  Устанавливаем  HWM в значение "бесконечность" - простое, но неумное решение.
    zsocket_set_hwm(router, 0);
    fDummy := zsocket_bind(router, PChar(Format(cTCPAddress, ['*'])));
    while (true) do begin
      //  Первый фрейм каждого сообщения - идентификатор отправителя
      identity := zframe_recv(router);
      if identity = nil then
        break; //  Завершение
      //  Второй фрейм - команда "fetch"
      command := zstr_recv(router);
      assert(command = 'fetch');
      zstr_free(command);
      while (true) do begin
        data := safe_malloc(cCHUNK_SIZE);
        assert(data <> nil);
        size := fFS.Read(data^, cCHUNK_SIZE);
        chunk := zframe_new(data, size);
        zframe_send(identity, router, c_ZFRAME_REUSE + c_ZFRAME_MORE);
        zframe_send(chunk, router, 0);
        if size = 0 then
          break; // В конце - всегда пустой фрейм
      end;
      zframe_destroy(identity);
    end;
  finally
    fFS.Free;
  end;
end;

procedure DoIt;
//  Основная задача запускает нити клиента и сервера:
//  с одним процессом с нитями легче тестировать, чем с несколькими процессами.
var
  client: Pointer;
  ctx: p_zctx_t;
  fStr: PChar;
  fTime: Cardinal;
begin
  fTime := GetTickCount;
  //  Старт серверной и клиентской нитей
  ctx := zctx_new();
  zthread_fork(ctx, @server_thread, nil);
  client := zthread_fork(ctx, @client_thread, nil);
  //  Цикл ожидания работы клиента
  fStr := zstr_recv(client);
  zstr_free(fStr);
  //  Уничтожение серверной нити
  zctx_destroy(ctx);
  fTime := GetTickCount - fTime;
  Writeln(fTime, ' msces');
end;
begin
  doIt;
  readln;
end.

Время выполнения - примерно 4000 миллисекунд.

Как видим, все просто, но уже появилась проблема: если мы отправляем слишком много  сообщений в сторону сокета ROUTER, легко возникает ситуация переполнения. Мы тупо установили HWM - вотермарк в значение 0, то есть, по сути, в "бесконечность". Не очень умное решение, так как теперь нет никакой защиты против исчерпания памяти сервера. Тем не менее, без бесконечного HWM, мы рискуем потерять куски больших файлов.

Если закомментировать строчку
  
zsocket_set_hwm(router, 0);

- то HWM будет установлен в 1000 (значение по умолчанию для ZeroMQ).

Теперь уменьшим размер блока до 10К. Запускаем тест и видим, что он никогда не заканчивается. Как сказано в руководстве к zmq_socket (): в случае переполнения буфера сокета ROUTER новые сообщения отбрасываются.

Таким образом, необходимо контролировать количество данных, которые сервер отправляет клиенту. Нет никакого смысла отправлять данных больше, чем может справиться сеть.
Изменим протокол так, чтобы сервер отсылал клиенту только один кусок за раз.
В этой версии протокола, клиент будет явно запрашивать, "Дай мне кусок N", а сервер будет читать этот кусок файла с диска и отправить его:

Продолжение.

Комментариев нет :

Отправить комментарий