lib/slack/real_time/concurrency/async.rb in slack-ruby-client-0.14.1 vs lib/slack/real_time/concurrency/async.rb in slack-ruby-client-0.14.2
- old
+ new
@@ -1,71 +1,103 @@
require 'async/websocket'
+require 'async/notification'
require 'async/clock'
module Slack
module RealTime
module Concurrency
module Async
- class Reactor < ::Async::Reactor
- def_delegators :@timers, :cancel
- end
-
class Client < ::Async::WebSocket::Client
extend ::Forwardable
def_delegators :@driver, :on, :text, :binary, :emit
end
class Socket < Slack::RealTime::Socket
attr_reader :client
+ def start_sync(client)
+ start_reactor(client).wait
+ end
+
def start_async(client)
- @reactor = Reactor.new
Thread.new do
+ start_reactor(client)
+ end
+ end
+
+ def start_reactor(client)
+ Async do |task|
+ @restart = ::Async::Notification.new
+
if client.run_ping?
- @reactor.every(client.websocket_ping) do
- client.run_ping!
+ @ping_task = task.async do |subtask|
+ subtask.annotate 'client keep-alive'
+
+ # The timer task will naturally exit after the driver is set to nil.
+ while @restart
+ subtask.sleep client.websocket_ping
+ client.run_ping! if @restart
+ end
end
end
- @reactor.run do |task|
- task.async do
- client.run_loop
+
+ while @restart
+ @client_task.stop if @client_task
+
+ @client_task = task.async do |subtask|
+ begin
+ subtask.annotate 'client run-loop'
+ client.run_loop
+ rescue ::Async::Wrapper::Cancelled => e
+ # Will get restarted by ping worker.
+ client.logger.warn(subtask.to_s) { e.message }
+ end
end
+
+ @restart.wait
end
+
+ @ping_task.stop if @ping_task
end
end
- def restart_async(client, new_url)
+ def restart_async(_client, new_url)
@url = new_url
@last_message_at = current_time
- return unless @reactor
- @reactor.async do
- client.run_loop
- end
+ @restart.signal if @restart
end
- def disconnect!
- super
- @reactor.cancel
- end
-
def current_time
::Async::Clock.now
end
def connect!
super
run_loop
end
+ # Kill the restart/ping loop.
+ def disconnect!
+ super
+ ensure
+ if restart = @restart
+ @restart = nil
+ restart.signal
+ end
+ end
+
+ # Close the socket.
def close
- @closing = true
- @driver.close if @driver
super
+ ensure
+ if @socket
+ @socket.close
+ @socket = nil
+ end
end
def run_loop
- @closing = false
while @driver && @driver.next_event
# $stderr.puts event.inspect
end
end