2016-04-22 3 views
0

Я пытаюсь создать рабочий процесс с использованием Highland.js. Я не могу понять, как Highland.js может использоваться для этого.Как написать фильтр (используя DB) с Highland.js

У меня есть рабочий процесс на основе потока, как показано ниже (псевдо-код),

read      //fs.createReadStream(...) 
    .pipe(parse)   //JSONStream.parse(...) 
    .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0 
    .pipe(transform)  //fn(item) { return tranform(item); } 
    .pipe(write);   //mongoClient.db.collection.insert(doc) 

filterDuplicate просматривает базу данных, чтобы проверить, если прочитать запись существует (используя условие) и возвращает логический результат. Чтобы фильтр работал, для него требуется активное соединение с БД, которое я хочу использовать повторно, пока поток не будет завершен. Один из способов - открыть соединение перед чтением и закрытием события «finish» записи; Это означает, что мне нужно передать соединение как параметр для фильтрации и записи, который будет работать, если оба метода используют одну и ту же базу данных.

В приведенном выше рабочем процессе filterDuplicate и write также могут использовать разные базы данных. Поэтому я ожидал бы, что соединение будет содержаться и управляться с помощью - в каждой функции, что делает его автономной единицей многократного использования.

Я ищу любые материалы о том, как это можно создать с использованием Highland.

Спасибо.

ответ

0

Это не будет так просто, как просто использовать pipe кучу раз. Вы должны использовать наиболее подходящий API-метод для задачи.

Вот грубый пример того, что вы, вероятно, будет в конечном итоге близко к:

read 
    .through(JSONStream.parse([true])) 
    .through((x) => { 
    h((next, push) => { // use a generator for async operations 
     h.wrapCallback(mongoCountQuery)(params) // you don't have to do it this way 
     .collect() 
     .toCallback((err, result) => { 
      if (result > 0) push(err, x); // if it met the criteria, hold onto it 
      return push(null, h.nil); // tell highland this stream is done 
     }); 
    }); 
    }) 
    .merge() // because you've got a stream of streams after that `through` 
    .map(transform) // just your standard map through a transform 
    .through((x) => { 
    h((next, push) => { // another generator for async operations 
     h.wrapCallback(mongoUpdateQuery)(params) 
     .toCallback((err, results) => { 
      push(err, results); 
      return push(null, h.nil); 
     }); 
    }); 
    }) 
    .merge() // another stream-of-streams situation 
    .toCallback(cb); // call home to say we're done 
+0

я искал специально для механизмов нагорных (если таковые имеются) для сохранения состояния. В этом случае открытое соединение db, для mongoCountQuery и mongoUpdateQuery, и триггер, когда он подходит для закрытия. После некоторого чтения я думаю, что состояние должно поддерживаться вне и явно передаваться как контекст для функций обработки потока. Таким образом, функции обработки потока просто используют контекст для выполнения своей работы, а Highland фокусируется на координации функций потока. – Krishnan

+0

Через четыре месяца я склоняюсь к тому, чтобы опросить мой собственный ответ. – amsross

Смежные вопросы