lib/action_cable/server/worker.rb in actioncable-5.0.0.beta3 vs lib/action_cable/server/worker.rb in actioncable-5.0.0.beta4

- old
+ new

@@ -10,55 +10,55 @@ thread_mattr_accessor :connection define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, ) end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) - end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @executor.kill end - def invoke(receiver, method, *args) - begin - self.connection = receiver + def stopping? + @executor.shuttingdown? + end - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + def work(connection) + self.connection = connection - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + run_callbacks :work do + yield end + ensure + self.connection = nil end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection + def invoke(receiver, method, *args, connection:) + work(connection) do + begin + receiver.send method, *args + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + receiver.handle_exception if receiver.respond_to?(:handle_exception) end - ensure - self.connection = nil end end private