Задача: переслать файл.
ZMQ "искаропки" отлично справляется с пересылкой сообщений и оповещениями, но вот с пересылкой файлов все не так очевидно. Можно, конечно, тупо попробовать засунуть файл в сообщение целиком.
Однако, есть возражения:
1. Оперативная память, к сожалению, не резиновая.
2. Ладно, отправили по сети файл-сообщение. А сеть оказалась медленной (какой-нибудь древний вай-фай), да еще и неустойчивой. ZMQ, как мы помним, пытается обеспечить доставку сообщения даже в случае сбоев. Итак, несколько попыток пересылки по 1 гигабайту - здорово?
3. А если этот гигантский файл хотят скачать несколько клиентов одновременно? Каждому клиенту выделить буфер нужного размера, отправить и ждать? Свободная память закончится гораздо быстрее. То есть, "правильный" протокол передачи файлов должен учитывать ограниченность размеров ОЗУ.
...
...начинаем строить правильный протокол передачи файлов.
Подготовка.
Для тестирования протокола создадим большой файл, состоящий из квазислучайной последовательности байт. Это нужно, чтобы при тестировании избежать возможного эффекта сжатия, данных, который может появиться из-за различных компонентов сети.
program TD;
{$APPTYPE CONSOLE}
uses
SysUtils, Classes;
const
cBlockSize = 1024;
cFileSize = 1024 * 1024 * 1024;
cTestDataFileName = 'TestData.tst';
procedure DoIt;
var
fFS: TFileStream;
i: Integer;
j: Integer;
fBuff: array of Integer;
begin
SetLength(fBuff, cBlockSize);
Randomize;
fFS := TFileStream.Create(cTestDataFileName, fmOpenWrite);
try
for i := 0 to Pred(cFileSize div (Length(fBuff) * SizeOf(Integer))) do begin
for j := Low(fBuff) to High(fBuff) do
fBuff[j] := Random(-1);
fFS.Write(fBuff[0], Length(fBuff) * SizeOf(Integer))
end;
finally
fFS.Free
end;
end;
begin
doIt;
readln;
end.
Компилируем, запускаем. Появился файл размером 1 Гб.
Проверим, насколько файл "несжимаем": сожмем его с помощью 7z:
Очень хорошо: после обработки 7z файл ничуть не сжался, даже наоборот.
Размер 1Гб достаточен, чтобы почувствовать проблемы, когда такой файл пытаются загрузить несколько клиентов.
Для начала измерим время, требуемое для последовательного чтения файла с диска. Все, что больше - будет временем, которое съест наш протокол пересылки файлов (включая сетевые задержки).
program ReadDataFromHDD_Time;
{$APPTYPE CONSOLE}
uses
SysUtils, Classes, Windows;
const
cBlockSize = 1024;
cTestDataFileName = 'TestData.tst';
procedure DoIt;
var
fTicks: Cardinal;
fIFS: TFileStream;
fBuff: array of Integer;
fCnt: Integer;
begin
SetLength(fBuff, cBlockSize);
fIFS := TFileStream.Create(cTestDataFileName, fmOpenRead);
try
fTicks := GetTickCount();
while fIFS.Read(fBuff[0], Length(fBuff) * SizeOf(Integer)) > 0 do ;
fTicks := GetTickCount() - fTicks;
Writeln('Time(msec):', fTicks);
finally
fIFS.Free;
end;
end;
begin
doIt;
readln;
end.
Запустим программу несколько раз. У меня получилось в среднем 375 миллисекунд (плюс-минус 25 миллисекунд). Запомним его
Начнем кодировать непосредственно пересылку файла.
Вариант 1. Клиент запрашивает файл, а сервер кусками без остановки пересылает его.
Для тестирования снова создадим многонитевое приложение:
program Model1_EntireFileByLargeChunks;
// Пересылка файлов, модель #1
// Сервер отсылает весь файл клиенту
// большими кусками без попыток управления потоком
{$APPTYPE CONSOLE}
uses
FastMM4,
SysUtils,
Classes,
zmq_h,
czmq_h,
ZMQ_Utils,
Windows;
const
cTestDataFileName = 'TestData.tst';
cTCPAddress = 'tcp://%s:6000';
cCHUNK_SIZE = 250000; // Размер куска
procedure client_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
var
chunks: size_t;
dealer: Pointer;
fDummy: Integer;
frame: p_zframe_t;
size: size_t;
total: size_t;
begin
dealer := zsocket_new(ctx, ZMQ_DEALER);
fDummy := zsocket_connect(dealer, PChar(Format(cTCPAddress, ['127.0.0.1']))); // Например
zstr_send(dealer, 'fetch');
total := 0; // Все байт принято
chunks := 0; // Всего кусков принято
while (true) do begin
frame := zframe_recv(dealer);
if frame = nil then
break; // Завершение
Inc(chunks);
size := zframe_size(frame);
zframe_destroy(frame);
Inc(total, size);
if size = 0 then
break; // Весь файл принят
end;
Writeln(Format('%d chunks received, %d bytes', [chunks, total]));
zstr_send(pipe, 'OK');
end;
procedure server_thread(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
// Нить сервера кусками читает файл с диска и отправляет
// каждый кусок клиенту в отдельном сообщении. Тестируем работу с одним файлом
var
chunk: p_zframe_t;
command: PChar;
data: pByte;
fDummy: Integer;
fFS: TFileStream;
identity: p_zframe_t;
router: Pointer;
size: size_t;
begin
fFS := TFileStream.Create(cTestDataFileName, fmOpenRead);
try
router := zsocket_new(ctx, ZMQ_ROUTER);
// HWM по умолчанию равно 1000, что может привести к потере сообщений в случае,
// если будет отправлено более 1000 порций данных.
// Устанавливаем HWM в значение "бесконечность" - простое, но неумное решение.
zsocket_set_hwm(router, 0);
fDummy := zsocket_bind(router, PChar(Format(cTCPAddress, ['*'])));
while (true) do begin
// Первый фрейм каждого сообщения - идентификатор отправителя
identity := zframe_recv(router);
if identity = nil then
break; // Завершение
// Второй фрейм - команда "fetch"
command := zstr_recv(router);
assert(command = 'fetch');
zstr_free(command);
while (true) do begin
data := safe_malloc(cCHUNK_SIZE);
assert(data <> nil);
size := fFS.Read(data^, cCHUNK_SIZE);
chunk := zframe_new(data, size);
zframe_send(identity, router, c_ZFRAME_REUSE + c_ZFRAME_MORE);
zframe_send(chunk, router, 0);
if size = 0 then
break; // В конце - всегда пустой фрейм
end;
zframe_destroy(identity);
end;
finally
fFS.Free;
end;
end;
procedure DoIt;
// Основная задача запускает нити клиента и сервера:
// с одним процессом с нитями легче тестировать, чем с несколькими процессами.
var
client: Pointer;
ctx: p_zctx_t;
fStr: PChar;
fTime: Cardinal;
begin
fTime := GetTickCount;
// Старт серверной и клиентской нитей
ctx := zctx_new();
zthread_fork(ctx, @server_thread, nil);
client := zthread_fork(ctx, @client_thread, nil);
// Цикл ожидания работы клиента
fStr := zstr_recv(client);
zstr_free(fStr);
// Уничтожение серверной нити
zctx_destroy(ctx);
fTime := GetTickCount - fTime;
Writeln(fTime, ' msces');
end;
begin
doIt;
readln;
end.
Время выполнения - примерно 4000 миллисекунд.
Как видим, все просто, но уже появилась проблема: если мы отправляем слишком много сообщений в сторону сокета ROUTER, легко возникает ситуация переполнения. Мы тупо установили HWM - вотермарк в значение 0, то есть, по сути, в "бесконечность". Не очень умное решение, так как теперь нет никакой защиты против исчерпания памяти сервера. Тем не менее, без бесконечного HWM, мы рискуем потерять куски больших файлов.
Если закомментировать строчку
zsocket_set_hwm(router, 0);
- то HWM будет установлен в 1000 (значение по умолчанию для ZeroMQ).
Теперь уменьшим размер блока до 10К. Запускаем тест и видим, что он никогда не заканчивается. Как сказано в руководстве к zmq_socket (): в случае переполнения буфера сокета ROUTER новые сообщения отбрасываются.Таким образом, необходимо контролировать количество данных, которые сервер отправляет клиенту. Нет никакого смысла отправлять данных больше, чем может справиться сеть.
Изменим протокол так, чтобы сервер отсылал клиенту только один кусок за раз.
В этой версии протокола, клиент будет явно запрашивать, "Дай мне кусок N", а сервер будет читать этот кусок файла с диска и отправить его:
Продолжение.




Комментариев нет :
Отправить комментарий