require "concurrent" require "net/http" require "aws-sdk" require 'sqspoller/common/utils' require 'sqspoller/logger/logger' # QueuePoller responsible for polling batch message[s] using Aws::SQS::QueuePoller based on configured :max_number_of_messages and push to the task queue. # If there is no messages available SQS it will keep polling based on :wait_time_seconds until the message arrives. module SqsPoller module Poller class QueuePoller DEFAULT_SQS_WAIT_TIME_SECONDS = 20 DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES = 10 def initialize(worker_name, queue_name, queue_config, task_queue, sqs_client, queue_url, counter) @worker_name = worker_name @queue_name = queue_name @queue_config = queue_config @task_queue = task_queue @wait_time_seconds = get_wait_time_seconds(@queue_config) @max_number_of_messages = get_max_number_of_messages(@queue_config) @sqs_client = sqs_client @queue_url = queue_url @logger = SqsPoller::Logger.get_new_logger("#{self.class.name}-#{@worker_name}") @counter = counter end def get_poller_stats @poller_stats end def run poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs_client }) poller.before_request do |stats| @poller_stats = stats end poller.poll(skip_delete: true, :wait_time_seconds => @wait_time_seconds, :max_number_of_messages => @max_number_of_messages) do |messages| if @max_number_of_messages == 1 messages = [messages] end messages.each { |message| task = { :message => message, :queue_name => @queue_name, :queue_time => SqsPoller::Common::Utils.get_current_time_in_millis, :index => @counter.increment } @task_queue.push task } @logger.info "Queued #{messages.size} messages from #{@queue_name}" end end def get_wait_time_seconds(queue_config) wait_time_seconds = queue_config[:wait_time_seconds] unless wait_time_seconds && wait_time_seconds >= 0 && wait_time_seconds <= DEFAULT_SQS_WAIT_TIME_SECONDS wait_time_seconds = DEFAULT_SQS_WAIT_TIME_SECONDS end wait_time_seconds end def get_max_number_of_messages(queue_config) max_number_of_messages = queue_config[:max_number_of_messages] unless max_number_of_messages && max_number_of_messages >= 0 && max_number_of_messages <= DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES max_number_of_messages = DEFAULT_SQS_MAX_NUMBER_OF_MESSAGES end max_number_of_messages end end end end