lib/action_cable/server/worker.rb in actioncable-5.0.0.beta3 vs lib/action_cable/server/worker.rb in actioncable-5.0.0.beta4
- old
+ new
@@ -10,55 +10,55 @@
thread_mattr_accessor :connection
define_callbacks :work
include ActiveRecordConnectionManagement
+ attr_reader :executor
+
def initialize(max_size: 5)
- @pool = Concurrent::ThreadPoolExecutor.new(
+ @executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
- def async_invoke(receiver, method, *args)
- @pool.post do
- invoke(receiver, method, *args)
- end
+ # Stop processing work: any work that has not already started
+ # running will be discarded from the queue
+ def halt
+ @executor.kill
end
- def invoke(receiver, method, *args)
- begin
- self.connection = receiver
+ def stopping?
+ @executor.shuttingdown?
+ end
- run_callbacks :work do
- receiver.send method, *args
- end
- rescue Exception => e
- logger.error "There was an exception - #{e.class}(#{e.message})"
- logger.error e.backtrace.join("\n")
+ def work(connection)
+ self.connection = connection
- receiver.handle_exception if receiver.respond_to?(:handle_exception)
- ensure
- self.connection = nil
+ run_callbacks :work do
+ yield
end
+ ensure
+ self.connection = nil
end
- def async_run_periodic_timer(channel, callback)
- @pool.post do
- run_periodic_timer(channel, callback)
+ def async_invoke(receiver, method, *args, connection: receiver)
+ @executor.post do
+ invoke(receiver, method, *args, connection: connection)
end
end
- def run_periodic_timer(channel, callback)
- begin
- self.connection = channel.connection
+ def invoke(receiver, method, *args, connection:)
+ work(connection) do
+ begin
+ receiver.send method, *args
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
- run_callbacks :work do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
- ensure
- self.connection = nil
end
end
private