2015-10-19 3 views
2

Это беспокоило меня в течение некоторого времени - кажется, что-то, что должно быть полностью выполнимо, но я застрял.Ruby Threads and Websockets

У меня есть небольшая программа Ruby, которая просто действует как промежуточный. Он длится (несколько минут), блокируя действие (через интерфейс FFI), но предполагается, что он отправляет периодические обновления, которые он получает от этого действия, посредством обратных вызовов в основное приложение Meteor через соединение DDP.

Оба компонента этой программы работают самостоятельно. Через систему я закатил свой собственный, а также metybur gem, я могу общаться с приложением Meteor. И, если я просто использую puts для вывода данных из обратных вызовов интерфейса FFI, я тоже получаю их. (За исключением того, что по какой-то другой причине я не могу на это положиться, действие FFI/блокировки бессильно выходит из строя, если оно находится в блоке Thread.new.)

По какой-то причине, однако, когда я пытаюсь отправить данные в Метеор приложение ничего не происходит. ws.send (on EventMachine) возвращает true, хотя на самом деле его не вызвали, даже если я положил его в свой собственный блок Thread.new.

Часть меня подозревает (хотя не может выяснить, как ее протестировать), что соединение потеряно, потому что приложение Ruby не может обрабатывать запросы keepingive ping/pong во время блокировки.

Я пробовал EM.spawn из EventMachine для процесса блокировки. Я попытался запустить EventMachine в своем потоке, но ничего не работает.

Любопытно, если есть лучшие практики для чего-то подобного, чтобы иметь возможность отслеживать часть приложения EventMachine приложения даже во время операций блокировки с интенсивным использованием процессора?

+1

Пожалуйста, разместите код, чтобы мы могли видеть, что вы пробовали. Кроме того, если вы считаете, что есть проблема с клиентом веб-сокета event-машины, возможно, попробуйте другие клиенты и посмотрите, как это происходит (я написал сервер сервера [GRHttp] (https://github.com/boazsegev/GRHttp) + клиент, который требует Ruby> = 2.1.0, но в дикой природе есть больше решений). – Myst

ответ

1

EDITED

После нашего обсуждения в комментариях, я решил пересмотреть код, который я и написать небольшую DDP инкапсуляцию Усиливая Iodine's WebSocket клиента (который я предпочитаю, так как я автор).

Должен признаться, мне очень понравилось думать об этом вопросе. Прикрепленный файл представляет собой упрощенный код для сокета Метеор с использованием иода. Это поистине базовое и включает только: обновление соединения, если оно отбрасывается, завершение рукопожатия и ответ на пинг-понг.

Чтобы использовать этот код вместе с концепцией FFI рабочего процесса, начатого в первом ответе, используйте:

# create the connection to the Meteor server 
# and setup the callback for incoming messages: 
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message| 
    Iodine.debug "got message #{message}, it's a Hash" 
end 

# next, create a dedicated thread for the FFI, 
# it will run until the FFI had finished 
# or the application exits. 
Thread.new do 
    # initialize FFI interface 
    data = StringIO.new "initialize FFI interface - StringIO will be our data for now" 
    # imagine it takes time 
    sleep 1 
    # Meteor will respond to these with error messages 
    (meteor_ddp << data.read(3)) && sleep(0.2) until data.eof? 
    sleep 1 
    Iodine.signal_exit 
end 

# it seems Meteor sends pings and since we already answer them in the 
# class object, it should be enough... 
# but we can do it too, if we want to: 
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json } 

Как к классу соединений Метеор DDP, это, вероятно, может быть достигнуто следующим образом:

require 'iodine/client' 

class IodineDDP 
    attr_reader :session 
    attr_reader :server_id 
    def initialize url, &block 
     @url = url 
     @ddp_initialized = false 
     @session = nil 
     @server_id = nil 
     @block = block 
     @closed = false 
     connect_websocket 
    end 

    def << message 
     Iodine.debug "Writing message #{message}" 
     ensure_connection 
     @ws << message 
    end 
    alias :write :<< 

    def close 
     @closed = true 
     @ws.on_close { nil } 
     @ws.close 
    end 

    protected 

    def on_message data 
     # make sure the DDP handshake is complete 
     return handshake data unless @ddp_initialized 
     data = JSON.parse(data) 
     Iodine.debug "Got message: #{data}" 
     return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping' 
     return true if data['msg'] == 'pong' 
     @block.call data 
    end 
    def on_close 
     @ddp_initialized = false 
     connect_websocket 
    end 

    def ensure_connection 
     return true unless @ws.closed? || [email protected]_initialized 
     raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed 
     raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || [email protected]_initialized) && Iodine.threads == 1 
     timeout = Iodine.time + 3 
     sleep 0.2 until @ddp_initialized && Iodine.time <= timeout 
     raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || [email protected]_initialized 
    end 

    def connect_websocket 
     @___on_message_proc ||= method(:on_message) 
     @___on_close_proc ||= method(:on_close) 
     @ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc) 
     # inform 
     Iodine.debug "initiating a new DDP connection to #{@url}" 
     # start the DDP handshake 
     handshake 
    end 
    def handshake last_message = nil 
     raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed? 
     unless last_message # this is the first message sent 
     Iodine.debug "Meteor DDP handshake initiated." 
     msg = {msg: "connect", version: "1", support: ["1"]} 
     msg[:session] = @session if @session 
     return(@ws << msg.to_json) 
     end 
     message = JSON.parse(last_message) 
     raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed' 
     if message['msg'] == 'connected' 
     # inform 
     Iodine.debug "Meteor DDP handshake complete." 
     @session = message['session'] 
     return @ddp_initialized = true 
     else 
     return @server_id = message['server_id'] if message['server_id'] 
     Iodine.error "Invalid handshake data - closing connection." 
     close 
     end  
    end 
end 

# we need at least two threads for the IodineDDP#ensure_connection 
Iodine.threads = 3 

# # if we are inside a larger application, call: 
# Iodine.force_start! 

# # if we are on irb: 
exit 
# no need to write anything if this is the whole of the script 
+0

Спасибо! Я заинтригован этой идеей, но мне трудно работать с иодом, чтобы работать с Метером. Я отправляю то же самое сообщение JSON ('({msg:" connect ", version:" 1 ", support: [" 1 "]}. To_json)'), как у меня с другими библиотеками, и я не могу показаться для подключения, и я не могу сказать, в чем разница. – Jared

+0

@ Jared, я прочитал некоторый код метеора, и кажется, что метеорит устанавливает специальные пути для разных коннекторов (например, '/ websocket','/iframe.html') ...Вы пытались подключиться к серверу, добавив '/ websocket' в конец вашего URL-адреса? ... можете ли вы изменить свой вопрос и опубликовать код, который вы пробовали? или (возможно, более вежливый), открываем новый вопрос, связанный с Iodine, и размещаем ссылку на вопрос здесь в комментарии? – Myst

+0

Подробнее о Метеор, кажется [у них есть протокол (DDP)] (https://atmospherejs.com/meteor/ddp), который используется поверх Websockets, так же, как REST используется поверх HTTP. Чтобы установить соединение, вам понадобится как правильный URL (добавить '/ websocket'), так и правильную последовательность входа. Вы можете найти письменные DDP-клиенты (для вдохновения или для использования, я думаю) [здесь] (http://meteorpedia.com/read/DDP_Clients#Ruby). – Myst