2012-01-21 3 views
2

Я использую EventMachine и EM-Synchrony на сервере API REST. Когда вы получаете запрос POST с большим двоичным файлом в теле, я получаю его в кусках, записывая эти фрагменты в Tempfile, не блокируя реактор.EventMachine чтение и запись файлов в кусках

Затем, в некоторый момент, мне нужно прочитать этот файл в кусках и записать эти фрагменты в окончательный файл. Это работает, но он блокирует реактор, как ожидалось, и не может найти способ заставить его работать без блокировки.

Я называю эту функцию в какой-то момент, передавая ему TempFile и новое имя файла:

def self.save(tmp_file, new_file) 
    tmp = File.open(tmp_file, "rb") 
    newf = File.open(new_file, "wb") 
    md5 = Digest::MD5.new 

    each_chunk(tmp, CHUNKSIZE) do |chunk| 
    newf << chunk 
    md5.update chunk 
    end 

    md5.hexdigest 
end 

def self.each_chunk(file, chunk_size=1024) 
    yield file.read(chunk_size) until file.eof? 
end 

Я читала все другие подобные вопросы здесь StackOverflow, пытаясь использовать EM # next_tick, который это, пожалуй, решение (не столько опыт EM), но и не может заставить его работать, возможно, я помещаю его в неправильные места.

Кроме того, я попробовал EM # defer, но мне нужно, чтобы функция дождалась завершения процесса чтения/записи до того, как он вернет md5, как в моем основном файле, после вызова этой функции я делаю обновление базы данных с возвращаемым значением.

Если кто-то может мне помочь, я был бы благодарен.

EDIT 1

Мне нужно, что функция сохранения только возвращает после завершения файлы для чтения/записи, как и в функции вызывающего абонента, я жду окончательного значения md5, что-то вроде этого:

def copy_and_update(...) 
    checksum = SomeModule.save(temp_file, new_file) 
    do_database_update({:checksum => checksum}) # only with the final md5 value 
end 
+1

'new' - это имя метода класса. Поэтому, если это не модуль, это может быть одна ошибка. Измените его на что-то вроде 'new_file' и протестируйте его. – Linuxios

+0

Whell, его только в этом примере кода, я не использую имена переменных. –

+0

ОК. Я просто подумал, что это может быть проблемой, поскольку самые маленькие проблемы могут показаться самыми большими. – Linuxios

ответ

1

Вы должны вводить что-то там, чтобы разбить его:

def self.each_chunk(file, chunk_size=1024) 
    chunk_handler = lambda { 
    unless (file.eof?) 
     yield file.read(chunk_size) 


    EM.next_tick(&chunk_handler) 
    end 
    } 

    EM.next_tick(&chunk_handler) 
end 

это своего рода грязный, чтобы сделать это таким образом, но такое это асинхронное программирование.

+0

Привет, большое спасибо! Я думаю, что это решает проблему чтения, но как насчет части записи (newf << chunk)? Его блокировка реактора. Кроме того, я обнаружил, что файл успешно скопирован, но #save возвращает переменную md5 после первой итерации, поэтому возвращаемый md5 является md5 первого фрагмента, так как он не ждет процесса копирования завершение. Я обновил первое сообщение (EDIT 1), поэтому вам будет легче помочь. –

+0

Я не знаю, что есть встроенный способ канала для чтения и записи через обработчик EventMachine. Лучший подход заключается в том, чтобы ваши блокирующие звонки были короткими или, по крайней мере, нечастыми. В этом случае ОС должна обрабатывать буферизацию записей и изолировать вас от проблем ввода-вывода. – tadman

Смежные вопросы