2017-02-23 3 views
1

Я разрабатываю сервер node.js в структуре Hapi. Я принял RabbitMQ (amqp), чтобы поставить в очередь мои задачи. Хотя, как только запрос отправляется, вместо немедленного ответа на запрос, сообщение отправляется на сервер Rabbit, где фактическая функция помещается в качестве потребителя. Затем потребитель должен вернуть результат функции (запрос, ответ), и пусть функция ответит на нее.Node.js транспортные данные между сценариями

Теперь мое решение создает переменную в моем рабочем файле (где находится потребитель amqp) и экспортирует его. Затем в индексном файле (мой основной скрипт с обработчиком маршрута) я импортирую переменную. Как только какой-либо запрос будет получен, он отправит сообщение на сервер RabbitMQ, и сервер изменит эту переменную. Затем, обратно в индексный файл, скрипт обновляет значение переменной и затем отвечает на него. Очевидно, что из-за асинхронности программа отвечает на результат предыдущего запроса.

Я провел некоторое исследование и обнаружил, что мы не должны делиться некоторой переменной между сценариями. у кого-нибудь есть решение? Моя цель заключается в том, что я могу поместить своих потребителей в сценарий. Как только я запустил сценарий, потребители будут готовы получить любое соответствующее сообщение. Затем в моем индексном файле после получения некоторого запроса он отправляет сообщение серверу RabbitMQ. И тогда он должен захватить результат потребителя и ответить на него.

Ниже мой код:

index.ts

import * as Joi from "joi"; 
import * as amqp from "amqplib/callback_api"; 
import * as waitUntil from "wait-until"; 

import * as repository from "./repository"; 
import * as worker from "./worker"; 

// defien variables from internal modules 
let greeter = new repository.Greeter(); 

// register type 
import {Register} from "../../interfaces"; 

// define amqp related stuff 
let greeterReply = worker.greeterReply; 

// helloWorld config including handler, validate and auth 
export let register: Register = (server, options, next) => { 
    server.route([ 
    { 
     method: "GET", 
     path: "/greeter", 
     config: { 
      handler: (request, reply) => { 
       let q: string = "greeter"; 
       let requestQuery = request.query; 
       let requestString = JSON.stringify(requestQuery); 
       amqp.connect("amqp://192.168.0.31", (err, conn) => { 
        conn.createChannel((err, ch) => { 
         ch.assertQueue(q, {durable: false}); 
         ch.sendToQueue(q, new Buffer(requestString)); 
        }); 
       }); 
       waitUntil(500, 10, function condition() { 
        greeterReply = worker.greeterReply; 
        return (greeterReply !== null); 
       }, function done(result) { 
        reply(greeterReply); 
        greeterReply = null; 
       }); 
      }, 
      validate: { 
       query: { 
        name: Joi.string(), 
        age: Joi.number() 
       } 
      }, 
     } 
    } 
    ]); 
    next(); 
}; 

register.attributes = { 
    name: "greeter", 
    version: "1.0" 
}; 

worker.ts

// import external modules 
import * as amqp from "amqplib/callback_api"; 

// import internal modules 
import * as repository from "./repository"; 
import * as indexModule from "./index"; 

// defien variables from internal modules 
let greeter = new repository.Greeter(); 

export let greeterReply = null; 

amqp.connect("amqp://192.168.0.31", (err, conn) => { 
    conn.createChannel((err, ch) => { 
     let q: string = "greeter"; 
     ch.assertQueue(q, {durable: false}); 
     ch.consume(q, function (requestString) { 
      let newRequest = JSON.parse(requestString.content.toString()); 
      console.log("replied via amqp"); 
      let result: string = "how are you"; 
      result = greeter.helloWorld(newRequest.name, newRequest.age); 
      console.log("the result is: ", result); 
      greeterReply = result; 
     }, {noAck: true}); 
    }); 
}); 
+0

Взгляните на плагины, чтобы инкапсулировать этот тип вещей. Также используйте обратный вызов в вашей рабочей функции для запуска и управления обработкой. Используя классы, вы можете инкапсулировать состояние каждого запроса, однако, если вы планируете высокую пропускную способность, это может стать проблемой. –

+0

Если я использую функцию обратного вызова, мне нужно вызвать функцию в файле индекса правильно? Это не то, что я намереваюсь сделать. У меня довольно много маршрутов. Не могли бы вы подробно объяснить метод плагина? Как я могу зарегистрировать amqp в качестве плагина? – zhangjinzhou

ответ

1

Что вам требуется является своего рода RPC над RabbitMQ. Он поддерживается RabbitMQ, как показано здесь в tutorials here.

Вы можете либо реализовать его самостоятельно, используя amqplib, который вы уже используете, либо можете использовать определенный модуль, например amqp-rpc, чтобы сделать это за вас.

+0

Я считаю, что это решение! Чем ты приятель. Но, не используя RPC, я замечаю, что моя программа (коды выше) отлично работает, если я не запускаю рабочего в терминале. Похоже, что потребитель работает как дочерний процесс, когда издатель отправляет сообщение. Вы знаете причину? @tbking – zhangjinzhou

+0

Это потому, что вы импортировали рабочий файл, а это значит, что вам нужно всего лишь запустить индексный файл, и он запустит рабочий файл и вызовет самого рабочего. Ваш код такой, что вам не нужно запускать эти файлы отдельно. – tbking

+0

Спасибо! В этом случае, по-вашему, мне все еще нужно использовать RPC? Если да, какую пользу я могу получить? – zhangjinzhou

Смежные вопросы