require "logger" require "concurrent" require "net/http" require "sqspoller/worker_task" require "sqspoller/common/ring_buffer" require "sqspoller/process/task_finalizer" require "sqspoller/common/utils" require "sqspoller/metrics/sqs_poller_metrics" require "sqspoller/process/task_worker" # This manages all process task workers threads using threadpools. # The number of thread can be configured in yaml config using concurrency: # Refer test.yaml module SqsPoller module Process class WorkerController private_class_method :new def initialize(worker_count, task_queue, worker_task, auto_tuning = true) @task_queue = task_queue @message_handler = worker_task @dynamic_scheduling = auto_tuning @worker_count = worker_count @task_workers = Concurrent::RubyThreadPoolExecutor.new(min_threads: @worker_count) @task_finalizer = TaskFinalizer.new(task_queue.max) @logger = SqsPoller::Logger.get_new_logger(self.class.name) @started = false end def self.get return @instance if @instance raise "WorkerController not yet started" end def self.start (worker_count, task_queue, worker_task, auto_tuning = true) return @instance if @instance @instance = new(worker_count, task_queue, worker_task, auto_tuning) @instance.start @instance end def started? @started end def start @logger.info "Starting #{@worker_count} workers" begin start_workers @started = true rescue Exception => e @logger.error "Task Worker killed for and restarted. Caught error: #{e.message}, #{e.backtrace.join("\n")}" end end def shutdown @logger.info "WorkerController Terminated" end private def start_workers @worker_count.times do |index| @task_workers.post do @logger.info "Starting worker #{index}" worker = Worker.new("Worker-#{index}", @task_queue, @task_finalizer, @message_handler) loop do begin worker.run rescue Exception => e @logger.error "Task Worker killed for and restarted. Caught error: #{e.message}, #{e.backtrace.join("\n")}" end end end end end end end end