lib/toiler/actor/supervisor.rb in toiler-0.2.4 vs lib/toiler/actor/supervisor.rb in toiler-0.2.5
- old
+ new
@@ -3,10 +3,12 @@
module Toiler
module Actor
# Actor that starts and supervises Toiler's actors
class Supervisor < Concurrent::Actor::RestartingContext
+ include Utils::ActorLogging
+
attr_accessor :client
def initialize
@client = ::Aws::SQS::Client.new
spawn_fetchers
@@ -21,24 +23,32 @@
Toiler.worker_class_registry
end
def spawn_fetchers
queues.each do |queue, _klass|
- fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
- supervise: true, args: [queue, client]
- Toiler.set_fetcher queue, fetcher
+ begin
+ fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
+ supervise: true, args: [queue, client]
+ Toiler.set_fetcher queue, fetcher
+ rescue StandardError => e
+ error "Failed to start Fetcher for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
+ end
end
end
def spawn_processors
queues.each do |queue, klass|
name = "processor_pool_#{queue}".to_sym
count = klass.concurrency
- pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
- Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
- supervise: true, args: [queue]
+ begin
+ pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
+ Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
+ supervise: true, args: [queue]
+ end
+ Toiler.set_processor_pool queue, pool
+ rescue StandardError => e
+ error "Failed to spawn Processor Pool for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
end
- Toiler.set_processor_pool queue, pool
end
end
end
end
end