lib/slack/real_time/concurrency/async.rb in slack-ruby-client-0.14.1 vs lib/slack/real_time/concurrency/async.rb in slack-ruby-client-0.14.2

- old
+ new

@@ -1,71 +1,103 @@ require 'async/websocket' +require 'async/notification' require 'async/clock' module Slack module RealTime module Concurrency module Async - class Reactor < ::Async::Reactor - def_delegators :@timers, :cancel - end - class Client < ::Async::WebSocket::Client extend ::Forwardable def_delegators :@driver, :on, :text, :binary, :emit end class Socket < Slack::RealTime::Socket attr_reader :client + def start_sync(client) + start_reactor(client).wait + end + def start_async(client) - @reactor = Reactor.new Thread.new do + start_reactor(client) + end + end + + def start_reactor(client) + Async do |task| + @restart = ::Async::Notification.new + if client.run_ping? - @reactor.every(client.websocket_ping) do - client.run_ping! + @ping_task = task.async do |subtask| + subtask.annotate 'client keep-alive' + + # The timer task will naturally exit after the driver is set to nil. + while @restart + subtask.sleep client.websocket_ping + client.run_ping! if @restart + end end end - @reactor.run do |task| - task.async do - client.run_loop + + while @restart + @client_task.stop if @client_task + + @client_task = task.async do |subtask| + begin + subtask.annotate 'client run-loop' + client.run_loop + rescue ::Async::Wrapper::Cancelled => e + # Will get restarted by ping worker. + client.logger.warn(subtask.to_s) { e.message } + end end + + @restart.wait end + + @ping_task.stop if @ping_task end end - def restart_async(client, new_url) + def restart_async(_client, new_url) @url = new_url @last_message_at = current_time - return unless @reactor - @reactor.async do - client.run_loop - end + @restart.signal if @restart end - def disconnect! - super - @reactor.cancel - end - def current_time ::Async::Clock.now end def connect! super run_loop end + # Kill the restart/ping loop. + def disconnect! + super + ensure + if restart = @restart + @restart = nil + restart.signal + end + end + + # Close the socket. def close - @closing = true - @driver.close if @driver super + ensure + if @socket + @socket.close + @socket = nil + end end def run_loop - @closing = false while @driver && @driver.next_event # $stderr.puts event.inspect end end