4

Я использовал Heroku tutorial для реализации веб-узлов.Thread running in Middleware использует старую версию переменной экземпляра родителя

Он работает правильно с тонким, но не работает с Единорогом и Пумой.

Также реализовано эхо-сообщение, которое отвечает на сообщение клиента. Он работает правильно на каждом сервере, поэтому нет проблем с реализацией websockets.

Настройка Redis также верна (она захватывает все сообщения и выполняет код внутри блока subscribe).

Как это работает в настоящее время:

На старте сервера, пустой @clients массив инициализируется. Затем запускается новый поток, который прослушивает Redis и который предназначен для отправки этого сообщения соответствующему пользователю из массива @clients.

Нагрузка на страницу создается новое соединение с веб-соединением, оно хранится в массиве @clients.

Если мы получим сообщение от браузера, мы отправим его всем клиентам, связанным с одним и тем же пользователем (эта часть работает правильно как с Thin, так и с Puma).

Если мы получим сообщение от Redis, мы также рассмотрим все подключения пользователя, хранящиеся в массиве @clients. Это где странная вещь происходит:

  • При работе с тонкими, он находит связи в @clients массиве и отправляет им сообщение.

  • При работе с Puma/Unicorn, @clients массив всегда пустой, даже если мы попытаемся его в таком порядке (без перезагрузки страницы или что-нибудь):

    1. Отправить сообщение из браузера ->@clients.length 1 , сообщение доставляется
    2. Отправить сообщение через Redis ->@clients.length 0, сообщение теряется
    3. Отправить сообщение из браузера ->@clients.length еще 1, сообщение доставляется

Не могли бы вы прояснить мне, что мне не хватает?

Связанные конфигурации сервера Puma:

workers 1 
threads_count = 1 
threads threads_count, threads_count 

Связанные код промежуточного слоя:

require 'faye/websocket' 

class NotificationsBackend 

    def initialize(app) 
    @app  = app 
    @clients = [] 
    Thread.new do 
     redis_sub = Redis.new 
     redis_sub.subscribe(CHANNEL) do |on| 
     on.message do |channel, msg| 
      # logging @clients.length from here will always return 0 
      # [..] retrieve user 
      send_message(user.id, { message: "ECHO: #{event.data}"}) 
     end 
     end 
    end 
    end 

    def call(env) 
    if Faye::WebSocket.websocket?(env) 
     ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME }) 
     ws.on :open do |event| 
     # [..] retrieve current user 
     if user 
      # add ws connection to @clients array 
     else 
      # close ws 
     end 
     end 

     ws.on :message do |event| 
     # [..] retrieve current user 
     Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}}) 
     end 

     ws.rack_response 
    else 
     @app.call(env) 
    end 
    end 
    def send_message user_id, message 
    # logging @clients.length here will always return correct result 
    # cs = all connections which belong to that client 
    cs.each { |c| c.send(message.to_json) } 
    end 
end 
+0

если вы регистрируете идентификатор процесса, когда ваш поток redis получает событие, а когда вы изменяете @clients, вы получаете то же значение? –

+0

@FrederickCheung только что проверил, они разные. Метод Initialize и поток прослушивателя Redis имеют одинаковый PID, но он отличается (ниже), чем тот, где изменен '@ clients'. BTW все клиенты хранятся в одном и том же процессе (все они принадлежат одному массиву PID и '@ clients') –

ответ

4

Unicorn (и, видимо, пума) и запустить мастер процесса, а затем раскошелиться один или несколько рабочих. (или, по крайней мере, представляет иллюзию копирования - фактическая копия обычно происходит только при записи на страницы) весь ваш процесс, но только новый поток, который называется fork, существует в новом процессе.

Очевидно, что ваше приложение инициализируется перед разветвлением - обычно это делается так, чтобы работники могли быстро начать работу и извлечь выгоду из копирования при экономии памяти.Как следствие, ваш контрольный поток redis работает только в мастер-процессе, тогда как @clients изменяется в дочернем процессе.

Возможно, вы обойдете это, либо отложив создание потока redis, либо отключив предварительную загрузку приложения, однако вы должны знать, что ваша установка не позволит вам масштабироваться за пределы одного рабочего процесса (который с puma и дружественным к потоку JVM, как JRuby будет меньше ограничений)

+0

Спасибо за объяснение. Что касается отключения предварительной загрузки приложений (хотя мне не нравится эта идея, но сейчас все в порядке): как я понимаю, единственная проблема, которую она будет поднимать, связана с этим блоком 'on.message'. Поэтому, если я опубликую сообщение, полученное в Redis, оно будет получено всеми работниками, и это позволит мне масштабироваться до более чем одного работника. Есть что-то еще, что мне не хватает? –

+0

Я неправильно понимаю, что вы делаете, но в любое время, когда вы полагаетесь на обмен некоторой переменной, которая хранится только в памяти, вы будете ограничены в своей способности добавлять дополнительные процессы –

3

Только в случае, если кто-то столкнется с той же проблемой, вот два решения я пришел с:

1. Отключить приложение поджимать (это был первый раствор, который я придумал)

Просто удалите preload_app! из файла puma.rb. Поэтому все потоки будут иметь свою собственную переменную @clients. И они будут доступны другими методами промежуточного программного обеспечения (например, call и т. Д.)

Недостаток: вы потеряете все преимущества предварительной загрузки приложений. Это нормально, если у вас есть только 1 или 2 сотрудника с несколькими потоками, но если вам нужно много их, то лучше иметь предварительную загрузку приложения. Поэтому я продолжил свои исследования, и вот еще одно решение:

2. Перемещение инициализации нить из initialize метода (это то, что я использую сейчас)

Например, я перенес ее в call метод, так это как класс промежуточного программного код выглядит следующим образом:

attr_accessor :subscriber 

def call(env) 
    @subscriber ||= Thread.new do # if no subscriber present, init new one 
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL']) 
    redis_sub.subscribe(CHANNEL) do |on| 
     on.message do |_, msg| 
     # parsing message code here, retrieve user 
     send_message(user.id, { message: "ECHO: #{event.data}"}) 
     end 
    end 
    end 
    # other code from method 
end 

Оба решения решить ту же проблему: Redis прослушивание поток будет инициализирован для каждого Puma рабочего/нити, а не для основного процесса (который на самом деле не обслуживающая запросы).

Смежные вопросы