require "logger" require "concurrent" require "net/http" require "sqspoller/worker_task" require "sqspoller/common/ring_buffer" require "sqspoller/process/task_finalizer" require "sqspoller/common/utils" require "sqspoller/metrics/sqs_poller_metrics" # This Worker keep polls the task queue (SizedQueue with blocking) and delegate the message the message handler. # Once message is process based on its status will send to task finalizer if the message is successfully handled. # else it will ignore the message. Based on message visibility timeout in SQS it will reappear and processed. module SqsPoller module Process class Worker def initialize(worker_name, task_queue, task_finalizer, message_handler) @worker_name = worker_name @task_queue = task_queue @message_handler = message_handler @task_finalizer = task_finalizer @logger = SqsPoller::Logger.get_new_logger("#{self.class.name}-#{@worker_name}") end def run loop do task = @task_queue.pop message = task[:message] success = false timer = SqsPoller::Common::Utils.start_timer message_type= "UNKNOWN" begin @logger.debug "Starting worker task for message: #{message.message_id}" message_type = @message_handler.handle(message.body, message.message_id) @logger.debug "Finished worker task for message: #{message.message_id}" success = true rescue Exception => e @logger.error "Caught error: #{e.message}, #{e.backtrace.join("\n")} for message id: #{message.message_id}, body: #{message.body}" end elapsed_time = timer.stop queue_wait_time = timer.start_time - task[:queue_time] @logger.info "Task Completed queue_name: #{task[:queue_name]}, message_id: #{message.message_id}, message_type: #{message_type}, elapsed_time: #{elapsed_time}, queue_wait_time: #{queue_wait_time} message_count: #{task[:index]}" SqsPoller::Metrics.record(task, success, timer, elapsed_time) @task_finalizer.finalize(task) if success end end end end end