2014-12-04 5 views
0

У меня есть Node.js программа, которая использует потоки для чтения файла (nodejs event stream setting a variable per stream)HTTP POST упругой поиски события поток сыпучего

Я хотел бы использовать ту же программу, чтобы записать эти данные в упругий поиск. Я написал небольшую функцию записи

var writeFunction = function(data) { 
    //console.log(data); 
    var client = request.newClient("http://localhost:9200"); 
    client.post('/newtest3/1',data,function(err,res,body) { 
     return console.log(res.statusCode); 
    }); 
}; 

и подключили это с потоковой

var processMyFile = function(file) { 
    var stream = getStream(file); 
    var nodeName = stream.nodeName; 
    stream 
     .pipe(es.split()) 
     .on('end',endFunction) 
     .pipe(es.map(function(data,cb) { 
      processFunction(nodeName,data,cb); 
     })) 
     .pipe(es.map(function(data,cb) { 
      writeFunction(data); 
     })); 

} 

Вышеуказанные работы, как ожидается асинхронно и записывает данные, за исключением того, что занимает много времени .Это также, кажется, работа в качестве буфера, так как запись занимает гораздо больше времени, чем чтение. (Преимущество использования трубы) Я знаю, что в эластичном поиске есть объемный интерфейс, и я могу импортировать его. Пример shakesphere.json в руководстве по началу работы в Kibana (http://www.elasticsearch.org/guide/en/kibana/current/using-kibana-for-the-first-time.html)

Это означает, что мне нужно будет создать файл в формате, необходимом для массового импорта, а затем запустить программу завивки и т. Д. Я бы хотел избежать создания временный файл.

Есть ли более простой способ для импорта данных в elasticsearch быстрее, так как часть процесса потокового

ответ

1

elasticsearch-streams поможет вам использовать насыпной интерфейс с потоковым, без необходимости писать файл JSon первым.

Я считаю, что ваш код будет более или менее так:

var TransformToBulk = require('elasticsearch-streams').TransformToBulk 
var WritableBulk = require('elasticsearch-streams').WritableBulk; 
var client = new require('elasticsearch').Client(); 

var bulkExec = function(bulkCmds, callback) { 
    client.bulk({ 
    index : 'newtest3', 
    type : '1', 
    body : bulkCmds 
    }, callback); 
}; 

var ws = new WritableBulk(bulkExec); 
var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: doc.id }; }); 

var processMyFile = function(file) { 
    var stream = getStream(file); 

    stream 
    .pipe(toBulk) 
    .pipe(ws) 
    .on('close', endFunction) 
    .on('err', endFunction); 
} 
Смежные вопросы