2017-02-14 4 views
0

Я разрабатываю компонент node.js, который слушает очередь сообщений (ActiveMQ) и добавляет полученные сообщения к повторению в партиях (должно быть 20 за пакет) ,Приложение Node.js слушает очередь сообщений и добавляет сообщения redis асинхронно

Нет проблем, если количество сообщений, полученных от ActiveMQ, составляет 10 в секунду или меньше.

Моя проблема заключается в том, что сообщения добавляются в очередь каждые 4 миллисекунды. Это приводит к тому, что количество записей, добавляемых в партию, иногда составляет более 20 штук.

const stompit = require('stompit'); 
var uuid = require('node-uuid'); 
var Redis = require('ioredis'); 

var redis = new Redis(); 
var pipeline = redis.pipeline(); 
var batchCounter = 0; 

stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) { 

client.subscribe({ destination: 'MyQueue' }, function(err2, msg) { 
    msg.readString('UTF-8', function(err3, body) { 

     if (batchCounter >= 20){ 
      pipeline.exec(function(err4, results) { 
       pipeline = redis.pipeline(); 
       batchCounter = 0; 
       console.log(results); 
      }); 
     } 

     batchCounter++; 
     pipeline.set(uuid.v1(), JSON.stringify(body)); 
     //client.disconnect(); 
     }); 
    }); 
    }); 

Как решить эту проблему? Спасибо

ответ

0

В итоге я контролировал количество записей за партию с использованием флагов. Я считаю, что есть еще одно возможное, возможно более эффективное, исправление, которое контролирует процесс чтения из ActiveMQ. Но в этой ситуации флаги выполнили эту работу для меня. Полный код с помощью флагов в следующей ссылке:

https://github.com/TamerB/ToActiveMQToRedis/blob/master/consumer/app.js

0

Попробуйте сбросить конвейер до вызова метода .exec, который, как я полагаю, является асинхронным методом. Поскольку .exec запускается в какой-то момент в будущем, приращение и pipeline.set могут запускаться перед ним.

Следующий код сохраняет текущий трубопровод и создает новую синхронно перед .exec

if (batchCounter >= 20){ 
    let fullpipeline = pipeline; 
    pipeline = redis.pipeline(); 
    batchCounter = 0; 
    fullpipeline.exec(function(err4, results) { 
     console.log(err4, results); 
    }); 
} 

Затем новые сообщения должны затем быть добавлены только к новому трубопроводу.

+0

Это все еще возможно, что вы могли бы в конечном итоге с несколькими трубопроводов собирается Redis в одно время. Если это также вызовет проблемы, вам может понадобиться очередь fullpipelines для обработки. – Matt