2016-01-06 4 views
0

У меня есть следующий код, который работает. Я начал пролонгировать его с помощью Bluebird, однако я не уверен, как я могу обещать обработку массива сообщений.Как обещать AWS SQS с помощью Bluebird

var s3 = new AWS.S3(); 
var sqs = new AWS.SQS(); 
// This notification call is triggered by the latest message but there may 
// be earlier unprocessed messages. So, we request the maximum number of 
// messages (10) from the queue and process and then remove from the queue 
// all of them. 
sqs.receiveMessage({ 
    QueueUrl: settings.sqsQueueUrl[prdOrDev], 
    /* required */ 
    WaitTimeSeconds: 20, // to enable long polling, which polls all servers for any unprocessed SQS messages 
    VisibilityTimeout: 120, // without this longpolling didn't work. 
    MaxNumberOfMessages: 10 
}, function(err, data) { 
    if (err) { 
    console.error('SQS receiveMessage failed: ', err, err.stack); 
    return res.status(400).json({ 
     success: false 
    }); 
    } else { 
    var messages = data.Messages; 
    messages.forEach(function(message) { 
     var body = JSON.parse(message.Body); 
     var sesMsg = JSON.parse(body.Message); 
     s3.getObject({ 
     Bucket: sesMsg.receipt.action.bucketName, 
     Key: sesMsg.receipt.action.objectKey 
     }, function(err, data2) { 
     if (err) { 
      console.error('S3 getObject failed: ', err, err.stack); 
     } else { 
      sqs.deleteMessage({ 
      QueueUrl: settings.sqsQueueUrl[prdOrDev], 
      /* required */ 
      ReceiptHandle: message.ReceiptHandle 
      }, function(err, data) { 
      if (err) { 
       console.error('SQS deleteMessage failed: ', err, err.stack); 
      } 
      }); 
     } 
     }); 
    }); 
    } 
}); 

Вот моя попытка promisifying код выше:

var Promise = require('bluebird'); 
var s3 = new AWS.S3(); 
var sqs = new AWS.SQS(); 
Promise.promisifyAll(Object.getPrototypeOf(s3)); 
Promise.promisifyAll(Object.getPrototypeOf(sqs)); 

sqs.receiveMessageAsync({ 
    QueueUrl: settings.sqsQueueUrl[prdOrDev], 
    /* required */ 
    WaitTimeSeconds: 20, // to enable long polling, which polls all servers for any unprocessed SQS messages 
    VisibilityTimeout: 120, // without this longpolling didn't work. 
    MaxNumberOfMessages: 10 
}).then(function(data) { 
    var messages = data.Messages; 
    messages.forEach(function(message) { 
    var body = JSON.parse(message.Body); 
    var sesMsg = JSON.parse(body.Message); 
    s3.getObjectAsync({ 
     Bucket: sesMsg.receipt.action.bucketName, 
     Key: sesMsg.receipt.action.objectKey 
    }).then(function(data2) { 
     return sqs.deleteMessageAsync({ 
     QueueUrl: settings.sqsQueueUrl[prdOrDev], 
     /* required */ 
     ReceiptHandle: message.ReceiptHandle 
     }).catch(function(err) { 
     console.log('SQS deleteMessage failed: ', err, err.stack); 
     }); 
    }).catch(function(err) { 
     console.log('S3 getObject failed: ', err, err.stack); 
    }); 
    }); 
}).catch(function(err) { 
    notifyAdmin('SQS receiveMessage failed: ', err, err.stack); 
}); 

Я предполагаю, что это не самый лучший способ использовать посылы. Я особенно интересно, если есть лучший способ справиться с петлей Foreach, аналогично следующему примеру с главной страницы Bluebird в:

mongoClient.connectAsync('mongodb://localhost:27017/mydb') 
    .then(function(db) { 
     return db.collection('content').findAsync({}) 
    }) 
    .then(function(cursor) { 
     return cursor.toArrayAsync(); 
    }) 
    .then(function(content) { 
     res.status(200).json(content); 
    }) 
    .catch(function(err) { 
     throw err; 
    }); 

Итак, как мне лучше всего promisify фрагмент кода, в верхней части с помощью Bluebird?

ответ

1

В цикле forEach ваша функция then() разрушает цепочку, вы создаете обещания, но вы не «ждете» их. Обычным способом является сохранение всех обещаний в массиве и использование Promise.all(). Так с кодом:

sqs.receiveMessageAsync({ 
    QueueUrl: settings.sqsQueueUrl[prdOrDev], 
    /* required */ 
    WaitTimeSeconds: 20, // to enable long polling, which polls all servers for any unprocessed SQS messages 
    VisibilityTimeout: 120, // without this longpolling didn't work. 
    MaxNumberOfMessages: 10 
}).then(function(data) { 
    var messages = data.Messages; 
    var promises = []; 
    messages.forEach(function(message) { 
    var body = JSON.parse(message.Body); 
    var sesMsg = JSON.parse(body.Message); 

    var promise = s3.getObjectAsync({ 
     Bucket: sesMsg.receipt.action.bucketName, 
     Key: sesMsg.receipt.action.objectKey 
    }).then(function(data2) { 
     return sqs.deleteMessageAsync({ 
     QueueUrl: settings.sqsQueueUrl[prdOrDev], 
     /* required */ 
     ReceiptHandle: message.ReceiptHandle 
     }).catch(function(err) { 
     console.log('SQS deleteMessage failed: ', err, err.stack); 
     }); 
    }).catch(function(err) { 
     console.log('S3 getObject failed: ', err, err.stack); 
    }); 

    promises.push(promise); 
    }); 

    return Promise.all(promises); 
}).then(function(result) { 
    console.log('all done'); 
}).catch(function(err) { 
    notifyAdmin('SQS receiveMessage failed: ', err, err.stack); 
}); 

Вы также можете упростить promisify код для:

var s3 = Promise.promisifyAll(new AWS.S3()); 
var sqs = Promise.promisifyAll(new AWS.SQS()); 
+0

Я проверить это в ближайшее время, спасибо. Я принял идею для 'Promise.promisifyAll (Object.getPrototypeOf (s3));' часть из [этой темы] (http://stackoverflow.com/a/26475487/2234029). Я еще не проверил это, но, похоже, работает на других. – woz

+0

@woz, в той же теме, кто-то другой говорит, что это не сработало для него: http://stackoverflow.com/a/28973401/5388620 – Shanoor

+0

Я только что протестировал 'Promise.promisifyAll (Object.getPrototypeOf (s3)); 'и' Promise.promisifyAll (Object.getPrototypeOf (sqs)); '. Я могу подтвердить, что эти вызовы работают нормально, и их возвращаемые значения не должны быть привязаны к vars s3 и sqs. Кроме того, функция обратного вызова (err, data2) 'не работает одинаково в 'then'. Он должен быть 'function (data2)'. Продолжая мое расследование ... – woz