2015-12-21 3 views
0

Мне нужно взять последовательность массива json objects 20Mb, поток в поток, разбить на меньшие массивы из 33 элементов, преобразовать их в html и затем передать в другой поток (для преобразования PDF).Прочитайте и обработайте мелкие куски внутри дуплексного потока в узле

Дело в том, что я не совсем понял, как работают потоки узлов. Я пытаюсь решить его с помощью дуплексного потока, но я не знаю, как объединить входящие фрагменты из верхнего потока и отправить их по частям в поток вниз. В этом коде

jsonReader = fs.createReadStream 'source.json' 

class Convert extends Duplex 

    constructor: -> 
     super readableObjectMode: true 
     # Duplex.call @, readableObjectMode: true 
     @buffer = [] 

    _read: (lines) -> 
     console.log "buffer #{@buffer.length}" 
     if @buffer.length is 0 
      @push null 
     else 
      console.log "lines: #{lines}" 
      page = @buffer.slice 0, 33 
      console.log page.length 
      @buffer.splice 0, 33 
      @push page 

    _write: (data, enconding, next) -> 
     @buffer.push data 
     next() 

convert = new Convert() 

jsonReader.pipe(convert).pipe(process.stdout) 

@ buffer всегда пуст. Где узел хранит куски, поступающие из верхних потоков?

ответ

0

data, который вы принимаете в _write, является буфером, двоичной частью входного файла, вы не получаете объекты или даже строки. Либо вы вручную разбираете куски, чтобы извлекать ваши объекты, либо загружать весь свой файл в память (20Mb не так уж и много) и проанализировать его. Вот пример (я использую event-stream для удобного управления потоками/creat):

es = require('event-stream') 

convert = (path) -> 
    # load and parse your file 
    content = JSON.parse fs.readFileSync(path, 'utf8') 

    es.readable (count, next) -> 
     # emit 33 elements at a time until content.length === 0 
     while content.length 
      this.emit 'data', content.splice(0, 33) 

     # close the stream 
     this.emit 'end' 
     next() 

srcPath = __dirname + '/source.json' 
# convert is a stream 
convert(srcPath) 
    # piping to console.log because process.stdout can't handle objects 
    .pipe(es.map (arr, next) -> 
     console.log arr 
     next() 
    ) 
Смежные вопросы