2016-03-25 3 views
3

Я изучаю SWE в новом граде Go (и люблю его).GoLang: Decompress bz2 in on goroutine, потребляйте в другом goroutine

Я создаю парсер для файлов дампа Wikipedia - в основном огромный XML-файл с сжатым файлом bzip2 (~ 50 ГБ без сжатия).

Я хочу сделать как потоковое декомпрессии, так и разбор, что звучит достаточно просто. Для декомпрессии, я:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

А затем передать читателя к XML-парсер:

decoder := xml.NewDecoder(inputFile)

Однако, так как декомпрессия и синтаксический дорогие операции, я хотел бы иметь они запускаются на отдельных процедурах Go, чтобы использовать дополнительные ядра. Как я буду делать это в Go?

Единственное, что я могу придумать, это обернуть файл в chan [] байт и реализовать интерфейс io.Reader, но я полагаю, что это может быть способ создания (и более чистого) использования.

Кто-нибудь когда-либо делал что-то подобное?

Спасибо! Manuel

ответ

2

Вы можете использовать io.Pipe, а затем использовать io.Copy раздвинуть распакованные данные в трубу, и читать его в другом goroutine:

package main 

import (
    "bytes" 
    "encoding/json" 
    "fmt" 
    "io" 
    "sync" 
) 

func main() { 

    rawJson := []byte(`{ 
      "Foo": { 
       "Bar": "Baz" 
      } 
     }`) 

    bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader 

    var wg sync.WaitGroup 
    wg.Add(2) 

    r, w := io.Pipe() 

    go func() { 
     // write everything into the pipe. Decompression happens in this goroutine. 
     io.Copy(w, bzip2Reader) 
     w.Close() 
     wg.Done() 
    }() 

    decoder := json.NewDecoder(r) 

    go func() { 
     for { 
      t, err := decoder.Token() 
      if err != nil { 
       break 
      } 
      fmt.Println(t) 
     } 
     wg.Done() 
    }() 

    wg.Wait() 
} 

http://play.golang.org/p/fXLnfnaWYA

+1

Это именно то, что мне было нужно, спасибо! К сожалению, кажется, что производительность декомпрессора stdard lib bzip2 невелика, поэтому он все еще является ограничивающим фактором. Я могу переключиться на этот компрессор: https://godoc.org/github.com/dsnet/compress/bzip2 Однако он все еще примерно на 33% медленнее, чем что-то вроде pbzip2. –

+0

Сколько вы ускорили, в конце концов, @ManuelMenzella? Мне нравится внешний вид этого кода - похоже, он должен работать, но в моем тестировании он только немного быстрее, чем все однопоточные (67 сек против 72 сек на 1М записи). Любая идея, что я могу делать неправильно, @ user1431317? – EM0

+0

Возможно, это все еще ограничено тем, насколько быстро декомпрессия bzip2 может подавать данные, а декодирование xml не занимает столько мощности процессора. Труба, вероятно, добавляет некоторые накладные расходы, хотя у io.Copy есть оптимизация, когда один или оба конца - это io.Reader/io.Writer. Возможно, он выделяет много небольших временных буферов, и это вызывает слишком много мусора. Может быть, буферизованный читатель или писатель помог бы. Вы должны профилировать свое приложение (как профиль cpu, так и mem-mem-mem, могут помочь вам найти много ненужных распределений). – user1431317

Смежные вопросы