2015-05-06 4 views
3

Попытка опроса очереди очереди обслуживания Azure с использованием WebJob, написанной в Node.js. Я создал 2 WebJobs. Первый по требованию и отправляет 10 уникальных сообщений в очередь. Второе задание является непрерывным и опрашивает очередь для сообщений.Опрос очереди служебной шины Azure с Azure WebJob с использованием Node.js

Встречая следующие вопросы:

  1. Опрос медленно. Для получения 10 сообщений требуется в среднем около 10 минут. Подробнее см. Ниже. В принципе непригодна на этой скорости. Вся задержка - от получения ответа от receiveQueueMessage. Время отклика варьируется от 0 секунд до ~ 120 секунд, в среднем 60 секунд.

  2. Сообщения принимаются в случайном порядке. Не FIFO.

  3. Иногда сообщения принимаются в два раза, несмотря на то, что они считываются в режиме ReceiveAndDelete (я пробовал без параметров режима чтения, который должен по умолчанию для ReceiveAndDelete с {isReceiveAndDelete:true} и {isPeekLock:false} с теми же результатами).

  4. Когда очередь пуста, она должна держать запрос на получение открытым в течение дня, но всегда возвращается с ошибкой сообщения об ошибке через 230 секунд. Согласно документации, максимально 24 дней, так что я не знаю, где 230 секунд приходит от:

Максимальное время ожидания для блокировки операции приема в Service Bus очереди составляет 24 дней. Однако тайм-ауты на основе REST имеют максимальное значение из 55 секунд.

В принципе ничего не работает, как рекламируется. Что я делаю не так?

Отправить сообщение Test Работа:

var uuid = require('node-uuid'); 
var azure = require('azure'); 
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString); 
var messagesToSend = 10; 

sendMessage(0); 

function sendMessage(count) 
{ 
    var message = { 
     body: 'test message', 
     customProperties: { 
      message_number: count, 
      sent_date: new Date 
     }, 
     brokerProperties: { 
      MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message 
     } 
    }; 

    serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) { 

     if (!err) { 
      console.log('sent test message number ' + count.toString()); 
     } else { 
      console.error('error sending message: ' + err); 
     } 

    }); 

    //wait 5 seconds to ensure messages are received by service bus in correct order 
    if (count < messagesToSend) { 
     setTimeout(function(newCount) { 
      //send next message 
      sendMessage(newCount); 
     }, 5000, count+1); 
    } 
}  

Получить сообщение Continuous Работа:

console.log('listener job started'); 
var azure = require('azure'); 
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString); 
listenForMessages(serviceBus); 

function listenForMessages(serviceBus) 
{ 
    var start = process.hrtime(); 
    var timeOut = 60*60*24; //long poll for 1 day 
    serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) { 

     var end = process.hrtime(start); 
     console.log('received a response in %ds seconds', end[0]); 

     if (err) { 

      console.log('error requesting message: ' + err); 
      listenForMessages(serviceBus); 

     } else { 

      if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) { 

       console.log('received test message number ' + message.customProperties.message_number.toString()); 
       listenForMessages(serviceBus); 

      } else { 

       console.log('invalid message received'); 
       listenForMessages(serviceBus); 

      } 

     } 

    }); 
} 

Sample Log Output:

[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running 
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started 
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds 
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0 
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds 
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4 
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds 
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7 
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds 
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9 
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds 
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1 
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds 
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1 
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds 
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5 
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds 
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9 
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds 
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2 
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds 
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6 
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds 
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8 
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds 
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3 
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds 
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive 
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds  
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive 

ответ

4

и вопрос был в очереди я использую распределяли (опция по умолчанию при создании очереди на портале Azure). Как только я создал новую очередь, которая не была разбита на разделы, все работало так, как ожидалось, без отставания (кроме странного 230-секундного таймаута при длительной попытке опроса). Таким образом, в основном библиотека node.js не работает для секционированных очередей. Вообще. Потратил много дней на то, чтобы понять это. Оставит это здесь для других.

+0

Эй Джоэл. Я знаю, что это не имеет никакого отношения к вопросу, но поскольку это новый. Как вы создали новый WebJob? Я не могу найти какой-либо вариант в любом меню на старом или новом портале в Azure ... Мне грустно и отчаянно ... Google не помогает в этом :( – lu1s

+1

Вам нужно настроить «веб-сайт» приложение ", и это вариант в веб-приложениях. Затем вы можете использовать его с помощью непрерывной интеграции git. См. [здесь] (http://blog.amitapple.com/post/74215124623/deploy-azure-webjobs/#.VU2BHa3BzGc) – Joel

+0

Большое спасибо! У меня уже есть веб-api, развернутый через Github в качестве веб-приложения. Я добавлю WebJobs именно там. – lu1s

1

Отключение флагов секционирования очереди служебной шины тоже сработало для меня.

С секционированной очередью некоторые сообщения имели задержки более 30 минут. Простой веб-клиент DotNet может загружать все сообщения без каких-либо задержек. Однако, как только nodejs должен был загружать сообщения, только первое сообщение было загружено без проблем, после чего появлялись задержки. Игра с nodejs для изменения параметров агента http keepalive и тайм-аута сокета не улучшала ситуацию.

После остановки nodejs мне пришлось ждать несколько минут, прежде чем клиент DotNet фактически начал работать без проблем. Это воспроизводилось несколько раз. Я также обнаружил, что простая программа веб-клиента DotNet показала похожие проблемы, после того, как она была запущена и остановлена ​​несколько раз подряд.

Во всяком случае, ваш пост показал мне решение: Выключите многораздельный флаг :)

1

Попробуйте использовать AMQP читать сообщения из лазури службы шины распределяли очередей и это будет работать для секционированной темы/очередь, и вам даже не нужно много опробовать.

const AMQPClient = require('amqp10').Client; 
const Policy = require('amqp10').Policy; 

const protocol = 'amqps'; 
const keyName = 'RootManageSharedAccessKey'; 
const sasKey = 'your_key_goes_here'; 
const serviceBusHost = 'namespace.servicebus.windows.net'; 
const uri = `${protocol}://${encodeURIComponent(keyName)}:${encodeURIComponent(sasKey)}@${serviceBusHost}`; 
const queueName = 'partitionedQueueName'; 
const client = new AMQPClient(Policy.ServiceBusQueue); 
client.connect(uri) 
.then(() => Promise.all([client.createReceiver(queueName)])) 
.spread((receiver) => { 
    console.log('--------------------------------------------------------------------------'); 
    receiver.on('errorReceived', (err) => { 
     // check for errors 
     console.log(err); 
    }); 
    receiver.on('message', (message) => { 
     console.log('Received message'); 
     console.log(message); 
     console.log('----------------------------------------------------------------------------'); 
    }); 
}) 
.error((e) => { 
    console.warn('connection error: ', e); 
}); 

https://www.npmjs.com/package/amqp10

Смежные вопросы