require "concurrent" require "net/http" require "aws-sdk" require 'sqspoller/logger/logger' require "sqspoller/poll/queue_poller" require "sqspoller/metrics/queue_stats_reporter" # QueueController Responsible for controlling ThreadPools per sqs queue. # Each queue can configure max number of threads to fetch messages from SQS and queue it. # For each queue there will be threadpool created and managed :workers_pool # Each worker thread will create Aws::SQS::QueuePoller and poll messages from SQS. module SqsPoller module Poller class QueueController private_class_method :new def initialize(queues_config, task_queue, aws_config) @queues_config = queues_config @aws_config = aws_config @task_queue = task_queue @logger = SqsPoller::Logger.get_new_logger(self.class.name) @counter = Concurrent::MutexAtomicFixnum.new(0) @sqs_client = Aws::SQS::Client.new(:access_key_id => @aws_config[:access_key_id], :secret_access_key => @aws_config[:secret_access_key], :region => @aws_config[:region]) @started = true # hash of threadpools by queue name @pollers_thread_pool = {} # hash of QueuePollers by queue name @pollers = {} @queue_urls = {} end def self.get return @instance if @instance raise "QueueController not yet started" end def self.delete_messages(queue, messages) self.get.delete_messages(queue, messages) end def self.delete_message(queue, message) self.get.delete_message(queue, message) end def delete_messages(queue, messages) @sqs_client.delete_message_batch( queue_url: get_queue_url(queue), entries: messages.map { |msg| { id: msg.message_id, receipt_handle: msg.receipt_handle } } ) end def get_queue_url(queue) @queue_urls[queue] end def delete_message(queue, message) delete_msg = { queue_url: get_queue_url(queue), receipt_handle: message.receipt_handle, } @client.delete_message(delete_msg) end def started? @started end def pollers @pollers end def self.start (queues_config, task_queue, aws_config) return @instance if @instance @instance = new(queues_config, task_queue, aws_config) @instance.start_queue_controller @instance.start_poller_stats_reporter @instance end def start_poller_stats_reporter SqsPoller::Metrics::SqsStatsReporter.new( { :queue_controller => self } ) end def start_queue_controller @queues_config.keys.each { |queue| queue_config = @queues_config[queue] polling_threads = queue_config[:polling_threads] if polling_threads == 0 @logger.info "Polling disabled for queue: #{queue}" next end started = false begin sqs_queue_config = @sqs_client.get_queue_url(queue_name: queue) started = start_pollers(polling_threads, queue, sqs_queue_config.queue_url, queue_config) @queue_urls[queue] = sqs_queue_config.queue_url rescue Exception => e @logger.error "Failed to start Queue Pollers. Caught error: #{e.message}\n #{e.backtrace.join("\n")}" end unless started @started = false @logger.error "Failed to start Queue Pollers for the queue #{queue}" end } end private def start_pollers(polling_threads, queue, queue_url, queue_config) workers = Concurrent::RubyThreadPoolExecutor.new(max_threads: polling_threads, min_threads: 1, max_queue: polling_threads) @pollers_thread_pool[queue] = workers @pollers[queue] = [] polling_threads.times do |index| worker = QueuePoller.new "QueuePoller-#{index}", queue, queue_config, @task_queue, @sqs_client, queue_url, @counter @pollers[queue] << worker workers.post do @logger.info "Starting QueuePoller-#{queue}-#{index} object for queue: #{queue}" loop do begin worker.run rescue Exception => e @logger.error "Poller killed for queue: #{e} and restarted" end end end sleep 0.01 end all_workers_started = workers.length == polling_threads all_workers_started end end end end