2014-12-31 5 views
1

У меня есть поток каталогов из модуля readdirp.Вложенные потоковые операции в Highland.js

Я хочу: -

  • поиск файла, используя регулярное выражение (например README.*) в каждом каталоге
  • прочитать первую строку этого файла, который начинается не с #
  • печати из каждой директории и этой первой строки без заголовка README в каталоге.

Я пытаюсь сделать это используя потоки и highland.js.

Я застреваю, пытаясь обработать поток всех файлов внутри каждого каталога.

h = require 'highland' 

dirStream = readdirp root: root, depth: 0, entryType: 'directories' 

dirStream = h(dirStream) 
    .filter (entry) -> entry.stat.isDirectory() 
    .map (entry) -> 

    # Search all files in the directory for README. 
    fileStream = readdirp root: entry.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store' 
    fileStream = h(fileStream).filter (entry) -> /README\..*/.test entry.name 
    fileStream.each (file) -> 
     readmeStream = fs.createReadStream file 
     _(readmeStream) 
     .split() 
     .takeUntil (line) -> not line.startsWith '#' and line isnt '' 
     .last(1) 
     .toArray (comment) -> 
      # TODO: How do I access `comment` asynchronously to include in the return value of the map? 

    return {name: entry.name, comment: comment} 

ответ

4

Лучше всего рассматривать Хайленд потоки, как неизменный, и такие операции, как filter и map возвращаются новые потоки, которые зависят от старого потока, а не модификаций старого потока.

Кроме того, методы Хайлендов ленивы: Вы должны назвать только each или toArray, когда вы абсолютно необходимы данные прямо сейчас.

Стандартный способ асинхронного отображения потока - flatMap. Это как map, но функция, которую вы ему даете, должна возвращать поток. Поток, который вы получаете от flatMap, является конкатенацией всех возвращенных потоков. Поскольку новый поток зависит от всех старых потоков в порядке, его можно использовать для последовательности асинхронного процесса.

Я бы изменить ваш пример следующим образом (уточнить некоторые имена переменных):

h = require 'highland' 

readmeStream = h(readdirp root: root, depth: 0, entryType: 'directories') 
    .filter (dir) -> dir.stat.isDirectory() 
    .flatMap (dir) -> 
    # Search all files in the directory for README. 
    h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store') 
    .filter (file) -> /README\..*/.test file.name 
    .flatMap (file) -> 
     h(fs.createReadStream file.name) 
     .split() 
     .takeUntil (line) -> not line.startsWith '#' and line isnt '' 
     .last(1) 
     .map (comment) -> {name: file.name, comment} 

Давайте прогуляемся, хотя типы в этом коде. Во-первых, обратите внимание, что flatMap имеет тип (в Haskellish нотации) Stream a → (a → Stream b) → Stream b, то есть он принимает поток, содержащий некоторые вещи типа a и функцию, ожидающую вещи типа a и возвращающие потоки, содержащие b s, и возвращает поток, содержащий b s. Стандартно для типов коллекций (таких как поток и массив) для реализации flatMap в качестве конкатенации возвращенных коллекций.

h(readdirp root: root, depth: 0, entryType: 'directories') 

Предположим, у этого типа Stream Directory. filter не изменяет тип, поэтому flatMap будет Stream Directory → (Directory → Stream b) → Stream b. Мы видим, что функция возвращает:

h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store') 

Зов это Stream File, поэтому второй flatMap является Stream File → (File → Stream b) → Stream b.

h(fs.createReadStream file.name) 

Это Stream String. split, takeUntil и last не меняйте это, так что делает map? map очень похож на flatMap: его тип Stream a → (a → b) → Stream b.В этом случае a - String, а b - тип объекта {name : String, comment : String}. Затем map возвращает поток этого объекта, который возвращает итоговую функцию flatMap. Шаг вверх и b во втором flatMap - это объект, поэтому первая функция flatMap также возвращает поток объекта, поэтому весь поток равен Stream {name : String, comment : String}.

Обратите внимание, что из-за ленивости Хайленда это фактически не приводит к потоковой передаче или обработке. Вы должны использовать each или toArray, чтобы вызвать thunk и начать конвейер. В each обратный вызов будет вызываться с вашим объектом. В зависимости от того, что вы хотите делать с комментариями, может быть лучше, например, flatMap (если вы пишете их в файл, например).

Ну, я не собирался писать эссе. Надеюсь это поможет.

Смежные вопросы