Redis + ActionController ::活线程不会死亡

背景:我们已经在我们现有的一个Rails应用程序中构build了一个聊天function。 我们使用新的ActionController :: Live模块并运行Puma(在生产中使用Nginx),并通过Redis订阅消息。 我们使用EventSource客户端来asynchronousbuild立连接。

问题摘要:连接终止时,线程永远不会死亡。

例如,如果用户离开,closures浏览器,甚至到应用程序中的不同页面,则会生成一个新线程(如预期的那样),但是旧线程继续存在。

我现在看到的问题是,当发生这些情况时,服务器无法知道浏览器端的连接是否终止,直到试图写入这个断开的stream,而浏览器永远不会发生已经离开原来的页面。

这个问题似乎被logging在github上 ,并且在StackOverflow上也有类似的问题(相当好的问题)和这里(关于获取活动线程的数量) 。

基于这些post,我能够想出的唯一解决scheme是实现一种线程/连接扑克。 试图写入一个断开的连接会生成一个IOError ,我可以捕获并正确closures连接,从而使线程死亡。 这是该解决scheme的控制器代码:

 def events response.headers["Content-Type"] = "text/event-stream" stream_error = false; # used by flusher thread to determine when to stop redis = Redis.new # Subscribe to our events redis.subscribe("message.create", "message.user_list_update") do |on| on.message do |event, data| # when message is received, write to stream response.stream.write("messageType: '#{event}', data: #{data}\n\n") end # This is the monitor / connection poker thread # Periodically poke the connection by attempting to write to the stream flusher_thread = Thread.new do while !stream_error $redis.publish "message.create", "flusher_test" sleep 2.seconds end end end rescue IOError logger.info "Stream closed" stream_error = true; ensure logger.info "Events action is quitting redis and closing stream!" redis.quit response.stream.close end 

(注意: events方法似乎在subscribe方法调用时被阻塞,其他所有(stream媒体)都正常工作,所以我认为这是正常的。)

(其他注意:冲洗线程的概念作为一个长时间运行的后台进程更有意义,有点像一个垃圾线程收集器。上面我的实现的问题是,每个连接都会产生一个新的线程,这是毫无意义的。试图实现这个概念应该更像是一个单独的过程,而不是我刚刚概述的那样,当我成功地将这个概念重新实现为一个后台过程时,我会更新这个post。

这个解决scheme的缺点是我们只是延迟或减less了问题,没有完全解决。 除了像ajax这样的其他请求,每个用户还有2个线程,从扩展的angular度来看,这看起来很糟糕; 对于具有许多可能的并发连接的更大的系统来说,这似乎是完全不可实现和不切实际的。

我觉得我错过了一些至关重要的东西; 我觉得有些难以相信,Rails有一个function,如果没有像我所做的那样实现一个自定义的连接检查程序,那么这个function显然被破坏了。

问题:我们如何让连接/线程死掉,而不执行诸如“连接扑克”或垃圾线程收集器这样的内容呢?

一如既往让我知道我是否遗漏了任何东西。

更新只是添加了一些额外的信息:github上的Huetsch发表了这个评论,指出SSE是基于TCP的,通常在连接closures时发送一个FIN数据包,让另一端(服务器在这种情况下)知道它的安全closures连接。 Huetsch指出浏览器不是发送这个数据包(可能是EventSource库中的一个错误?),或者Rails没有捕获它或者做任何事情(绝对是Rails中的一个错误,如果是这样的话)。 search继续…

另一个更新使用Wireshark,我确实可以看到正在发送的FIN数据包。 无可否认,我对协议级别的东西不是非常了解或者有经验,但是从我所知道的情况来看,当我使用浏览器中的EventSourcebuild立SSE连接时,我肯定会发现从浏览器发送的FIN数据包,删除该连接(意味着没有SSE)。 虽然我的TCP知识不是很高,但这似乎表明,这个连接确实正在被客户正确地终止。 也许这表明了Puma或Rails中的一个bug。

还有另一个更新 @JamesBoutcher / boutcheratwest(github)指出我在redis网站上就这个问题进行了讨论 ,特别是关于.(p)subscribe方法从不closures的事实。 该网站上的海报指出了我们在这里发现的同样的事情,当客户端连接closures时,Rails环境从未被通知,因此无法执行.(p)unsubscribe方法。 他询问了.(p)subscribe方法的超时时间,我认为这也是可行的,尽pipe我不确定哪种方法(我上面描述的连接扑克,或者超时build议)是一个更好的解决scheme。 理想情况下,对于连接扑克解决scheme,我想find一种方法来确定连接是否在另一端closures而不写入stream。 正如你所看到的,我们必须实现客户端代码来分别处理我的“戳动”信息,而我认为这些信息是非常突兀和愚蠢的。

我刚刚做了一个解决scheme(从@teeg借了很多),似乎工作正常(没有失败testing,​​tho)

configuration/初始化/ redis.rb

 $redis = Redis.new(:host => "xxxx.com", :port => 6379) heartbeat_thread = Thread.new do while true $redis.publish("heartbeat","thump") sleep 30.seconds end end at_exit do # not sure this is needed, but just in case heartbeat_thread.kill $redis.quit end 

然后在我的控制器中:

 def events response.headers["Content-Type"] = "text/event-stream" redis = Redis.new(:host => "xxxxxxx.com", :port => 6379) logger.info "New stream starting, connecting to redis" redis.subscribe(['parse.new','heartbeat']) do |on| on.message do |event, data| if event == 'parse.new' response.stream.write("event: parse\ndata: #{data}\n\n") elsif event == 'heartbeat' response.stream.write("event: heartbeat\ndata: heartbeat\n\n") end end end rescue IOError logger.info "Stream closed" ensure logger.info "Stopping stream thread" redis.quit response.stream.close end 

我目前正在制作一个围绕ActionController:Live,EventSource和Puma的应用程序,以及那些遇到closuresstream等问题的应用程序,而不是拯救IOError ,在Rails 4.2中,您需要救援ClientDisconnected 。 例:

 def stream #Begin is not required twitter_client = Twitter::Streaming::Client.new(config_params) do |obj| # Do something end rescue ClientDisconnected # Do something when disconnected ensure # Do something else to ensure the stream is closed end 

我发现这个方便的提示从这个论坛post(一路在底部): http : //railscasts.com/episodes/401-actioncontroller-live?view=comments

在@James Boutcher的基础上,我使用了两个worker的集群Puma中的以下内容,以便在config / initializers / redis.rb中为心跳创build一个线程:

configuration/ puma.rb

 on_worker_boot do |index| puts "worker nb #{index.to_s} booting" create_heartbeat if index.to_i==0 end def create_heartbeat puts "creating heartbeat" $redis||=Redis.new heartbeat = Thread.new do ActiveRecord::Base.connection_pool.release_connection begin while true hash={event: "heartbeat",data: "heartbeat"} $redis.publish("heartbeat",hash.to_json) sleep 20.seconds end ensure #no db connection anyway end end end 

这是一个不使用心跳的更简单的解决scheme。 经过大量的研究和实验,下面是我使用的sinatra + sinatra sse gem的代码(应该很容易适应Rails 4):

 class EventServer < Sinatra::Base include Sinatra::SSE set :connections, [] . . . get '/channel/:channel' do . . . sse_stream do |out| settings.connections << out out.callback { puts 'Client disconnected from sse'; settings.connections.delete(out); } redis.subscribe(channel) do |on| on.subscribe do |channel, subscriptions| puts "Subscribed to redis ##{channel}\n" end on.message do |channel, message| puts "Message from redis ##{channel}: #{message}\n" message = JSON.parse(message) . . . if settings.connections.include?(out) out.push(message) else puts 'closing orphaned redis connection' redis.unsubscribe end end end end end 

redis连接阻止on.message,只接受(p)订阅/(p)取消订阅命令。 取消订阅后,redis连接将不再被阻止,并且可以通过初始sse请求实例化的Web服务器对象释放。 当您在redis上收到消息时会自动清除,并且sse连接到浏览器不再存在于收集数组中。

这里你是超时的解决scheme,它将退出阻塞Redis。(p)订阅呼叫并杀死未使用的连接。

 class Stream::FixedController < StreamController def events # Rails reserve a db connection from connection pool for # each request, lets put it back into connection pool. ActiveRecord::Base.clear_active_connections! # Last time of any (except heartbeat) activity on stream # it mean last time of any message was send from server to client # or time of setting new connection @last_active = Time.zone.now # Redis (p)subscribe is blocking request so we need do some trick # to prevent it freeze request forever. redis.psubscribe("messages:*", 'heartbeat') do |on| on.pmessage do |pattern, event, data| # capture heartbeat from Redis pub/sub if event == 'heartbeat' # calculate idle time (in secounds) for this stream connection idle_time = (Time.zone.now - @last_active).to_i # Now we need to relase connection with Redis.(p)subscribe # chanel to allow go of any Exception (like connection closed) if idle_time > 4.minutes # unsubscribe from Redis because of idle time was to long # that's all - fix in (almost)one line :) redis.punsubscribe end else # save time of this (last) activity @last_active = Time.zone.now end # write to stream - even heartbeat - it's sometimes chance to # capture dissconection error before idle_time response.stream.write("event: #{event}\ndata: #{data}\n\n") end end # blicking end (no chance to get below this line without unsubscribe) rescue IOError Logs::Stream.info "Stream closed" rescue ClientDisconnected Logs::Stream.info "ClientDisconnected" rescue ActionController::Live::ClientDisconnected Logs::Stream.info "Live::ClientDisconnected" ensure Logs::Stream.info "Stream ensure close" redis.quit response.stream.close end end 

你必须使用红色(p)取消订阅来结束这个阻塞呼叫。 没有例外可以打破这一点。

我简单的应用程序与此修复信息: https : //github.com/piotr-kedziak/redis-subscribe-stream-puma-fix

而不是发送心跳给所有的客户,可能会更容易为每个连接设置一个看门狗。 [感谢@NeilJewers]

 class Stream::FixedController < StreamController def events # Rails reserve a db connection from connection pool for # each request, lets put it back into connection pool. ActiveRecord::Base.clear_active_connections! redis = Redis.new watchdog = Doberman::WatchDog.new(:timeout => 20.seconds) watchdog.start # Redis (p)subscribe is blocking request so we need do some trick # to prevent it freeze request forever. redis.psubscribe("messages:*") do |on| on.pmessage do |pattern, event, data| begin # write to stream - even heartbeat - it's sometimes chance to response.stream.write("event: #{event}\ndata: #{data}\n\n") watchdog.ping rescue Doberman::WatchDog::Timeout => e raise ClientDisconnected if response.stream.closed? watchdog.ping end end end rescue IOError rescue ClientDisconnected ensure response.stream.close redis.quit watchdog.stop end end 
Interesting Posts