понедельник, 7 декабря 2015 г.

23.2 ZeroMQ: Пересылка файлов. Модель №2 - клиент запрашивает каждую порцию файла по отдельности.


Продолжение (начало - здесь).

Вторая версия протокола пересылки файла предусматривает, что клиент каждый раз запрашивает по одной порции файла, а сервер, соответственно, возвращает по одной порции на каждый запрос, полученный от клиента:




//**
program Model2_TransferByOneChunk;
//  Пересылка файлов, модель #2
//  Клиент запрашивает по одному куску файла.
//  Это устраняет проблему перегрузки сервера, но перегружает сеть


{$APPTYPE CONSOLE}

uses
  FastMM4,
  SysUtils,
  Classes,
  zmq_h,
  czmq_h,
  ZMQ_Utils,
  Windows;

const
  cTestDataFileName = 'TestData.tst';
  cTCPAddress = 'tcp://%s:6000';
  cCHUNK_SIZE = 250000; // Размер куска

procedure client_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
//  Клиентская нить. Каждая порция файла запрашивается отдельно.
//  В итоге исчезает перегрузка сервера, но падает скорость.
var
  chunk: p_zframe_t;
  chunks: size_t;
  dealer: Pointer;
  fDummy: Integer;
  size: size_t;
  total: size_t;
begin
  dealer := zsocket_new(ctx, ZMQ_DEALER);
  zsocket_set_hwm(dealer, 1);
  fDummy := zsocket_connect(dealer, PChar(Format(cTCPAddress, ['127.0.0.1']))); // Например

  total := 0; //  Всего байт принято
  chunks := 0; //  Всего кусков принято

  while (true) do begin
  // Сообщение из 3х фреймов:
    zstr_sendm(dealer, 'fetch'); // Запрос на чтение
    zstr_sendfm(dealer, '%ld', total); // Начало блока в файле
    zstr_sendf(dealer, '%ld', cCHUNK_SIZE); // Длина блока

    chunk := zframe_recv(dealer);
    if chunk = nil then
      break; //  Завершение
    Inc(chunks);
    size := zframe_size(chunk);
    zframe_destroy(chunk);
    Inc(total, size);
    if size < cCHUNK_SIZE then
      break; //  Весь файл принят

  end;
  Writeln(Format('%d chunks received, %d bytes', [chunks, total]));
  zstr_send(pipe, 'OK');
end;

procedure server_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
//  Серверная нить ждет запроса клиента, читает порцию файла,
//  и отправляет ее
var
  chunk: p_zframe_t;
  chunksz: size_t;
  chunksz_str: PChar;
  command: PChar;
  data: pByte;
  fDummy: Integer;
  fFS: TFileStream;
  identity: p_zframe_t;
  offset: size_t;
  offset_str: PChar;
  router: Pointer;
  size: size_t;
begin
  fFS := TFileStream.Create(cTestDataFileName, fmOpenRead);
  try
    router := zsocket_new(ctx, ZMQ_ROUTER);
    zsocket_set_hwm(router, 1);
    fDummy := zsocket_bind(router, PChar(Format(cTCPAddress, ['*'])));
    while (true) do begin
      //  Первый фрейм каждого сообщени - идентификатор отправителя
      identity := zframe_recv(router);
      if identity = nil then
        break; //  Завершение

      //  Второй фрем - команда "fetch"
      command := zstr_recv(router);
      assert(command = 'fetch');
      zstr_free(command);

      //  Третий фрейм - смещение куска относительно начала файла
      offset_str := zstr_recv(router);
      offset := StrToInt(offset_str);
      zstr_free(offset_str);

      //  Четвертый фрейм - максимальная величина куска
      chunksz_str := zstr_recv(router);
      chunksz := StrToInt(chunksz_str);
      zstr_free(chunksz_str);

      //  Чтение порции данных из файла
      fFS.Position := offset;
      data := safe_malloc(chunksz);
      assert(data <> nil);

      //  Отправление результата клиенту
      size := fFS.Read(data^, chunksz);
      chunk := zframe_new(data, size);
      zframe_send(identity, router, c_ZFRAME_MORE);
      zframe_send(chunk, router, 0);

    end;
  finally
    fFS.Free;
  end;
end;

procedure DoIt;
//  Основная задача запускает нити клиента и сервера:
//  с одним процессом с нитями легче тестировать, чем с несколькими процессами.
var
  client: Pointer;
  ctx: p_zctx_t;
  fStr: PChar;
  fTime: Cardinal;
begin
  fTime := GetTickCount;
  //  Старт серверной и клиентской нитей
  ctx := zctx_new();
  zthread_fork(ctx, @server_thread, nil);
  client := zthread_fork(ctx, @client_thread, nil);
  //  Цикл ожидания работы клиента
  fStr := zstr_recv(client);
  zstr_free(fStr);
  //  Уничтожение серверной нити
  zctx_destroy(ctx);
  Writeln(GetTickCount - fTime, ' msec')
end;
begin
  doIt;
  readln;
end.

//**

-----------


Время выполнения - примерно 7000 миллисекунд. Эта модель работает гораздо медленнее первой из-за постоянного общения клиента с сервером. На 3000 мс дольше, чем модель 1. Мы теряем 700 микросекунд на каждом цикле запрос-ответ, причем это - для случая, когда клиент и сервер размещены на одной и той же машине. Вроде бы и немного, но большом количестве запросов задержка получается солидной: дополнительные 3 скунды. Почти в два раза медленнее. И это - при работе через localhost! На реальных сетях тормоза будут в тысячу раз больше.

Тем не менее, модель №2 исключает риск истощения памяти. Чтобы доказать это, в коде серверной и клиентской нитей установлены значения HWM в 1.
Кроме того, модель 2 позволяет клиенту выбирать размер пересылаемой порции. Размер может меняться в зависимости от характеристик сети, от типов файлов, или для уменьшения потребления памяти.
Модель 2 позволяет реализовать перезапуск передачи файлов, с любой его части.
Кроме того, она позволяет клиенту в любой момент отменить передачу файлов.

То есть, протокол был бы вполне пригодным, если бы не было необходимости запрашивать каждый кусок заново. Нужно, чтобы сервер отправлял сразу несколько кусков файла, не дожидаясь, пока клиент их запросит.

Так будет работать Модель 3. Продолжение.

Комментариев нет :

Отправить комментарий