lib/toiler/actor/supervisor.rb in toiler-0.3.0 vs lib/toiler/actor/supervisor.rb in toiler-0.3.1.beta1
- old
+ new
@@ -1,55 +1,55 @@
-require 'toiler/actor/fetcher'
-require 'toiler/actor/processor'
-
-module Toiler
- module Actor
- # Actor that starts and supervises Toiler's actors
- class Supervisor < Concurrent::Actor::RestartingContext
- include Utils::ActorLogging
-
- attr_accessor :client
-
- def initialize
- @client = ::Aws::SQS::Client.new
- spawn_fetchers
- spawn_processors
- end
-
- def on_message(_msg)
- pass
- end
-
- def queues
- Toiler.worker_class_registry
- end
-
- def spawn_fetchers
- queues.each do |queue, _klass|
- begin
- fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
- supervise: true, args: [queue, client]
- Toiler.set_fetcher queue, fetcher
- rescue StandardError => e
- error "Failed to start Fetcher for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
- end
- end
- end
-
- def spawn_processors
- queues.each do |queue, klass|
- name = "processor_pool_#{queue}".to_sym
- count = klass.concurrency
- begin
- pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
- Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
- supervise: true, args: [queue]
- end
- Toiler.set_processor_pool queue, pool
- rescue StandardError => e
- error "Failed to spawn Processor Pool for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
- end
- end
- end
- end
- end
-end
+require 'toiler/actor/fetcher'
+require 'toiler/actor/processor'
+
+module Toiler
+ module Actor
+ # Actor that starts and supervises Toiler's actors
+ class Supervisor < Concurrent::Actor::RestartingContext
+ include Utils::ActorLogging
+
+ attr_accessor :client
+
+ def initialize
+ @client = ::Aws::SQS::Client.new
+ spawn_fetchers
+ spawn_processors
+ end
+
+ def on_message(_msg)
+ pass
+ end
+
+ def queues
+ Toiler.worker_class_registry
+ end
+
+ def spawn_fetchers
+ queues.each do |queue, _klass|
+ begin
+ fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
+ supervise: true, args: [queue, client]
+ Toiler.set_fetcher queue, fetcher
+ rescue StandardError => e
+ error "Failed to start Fetcher for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
+ end
+ end
+ end
+
+ def spawn_processors
+ queues.each do |queue, klass|
+ name = "processor_pool_#{queue}".to_sym
+ count = klass.concurrency
+ begin
+ pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
+ Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
+ supervise: true, args: [queue]
+ end
+ Toiler.set_processor_pool queue, pool
+ rescue StandardError => e
+ error "Failed to spawn Processor Pool for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
+ end
+ end
+ end
+ end
+ end
+end