require "logger" require "concurrent" require "net/http" require "rest-client" require "json" require "sqspoller/logger/logger" require "sqspoller/common/ring_buffer" require "sqspoller/poll/queue_controller" # TaskFinalizer will delete the batch of messages from SQS. # It used the RingBuffer as the cache (Not persisted. We will lose all the messages if process killed or stopped.) to hold all completed tasks. # When buffer filled the half the sizes it will try to delete the messages using sqs batch delete (max 10 per batch) from SQS and with max of 5 retries module SqsPoller module Process class TaskFinalizer DEFAULT_FINALIZE_TIMER_DELAY = 5.0 MAX_SQS_DELETE_BATCH_SIZE = 10 MAX_DELETE_RETRY = 5 def initialize(buffer_size) @logger = SqsPoller::Logger.get_new_logger(self.class.name) @buffer_size = buffer_size @completed_tasks = SqsPoller::Common::RingBuffer.new(buffer_size * 2) schedule_timer end def buffer_size @buffer_size end def finalize(completed_task) @completed_tasks.push completed_task if @completed_tasks.count >= @buffer_size @timer.reset finalize_tasks end end private def schedule_timer @timer = Concurrent::ScheduledTask.new(DEFAULT_FINALIZE_TIMER_DELAY) { finalize_tasks true schedule_timer } @timer.execute end def finalize_tasks(scheduled = false) return if !scheduled && @completed_tasks.count < @buffer_size completed_tasks = @completed_tasks.flush begin task_group = completed_tasks.group_by { |task| task[:queue_name] } task_group.each { |queue, tasks| messages = tasks.map { |task| task[:message] } messages.each_slice(MAX_SQS_DELETE_BATCH_SIZE) do |msgs| SqsPoller::Poller::QueueController.delete_messages(queue, msgs) end @logger.debug "Deleted #{messages.size} messages from #{queue}" } rescue Exception => e @logger.error "TaskFinalizer Caught error: #{e.message}, #{e.backtrace.join("\n")}" finalize_task_individually(completed_tasks) end end def finalize_task_individually(completed_tasks) completed_tasks.each { |task| begin SqsPoller::Poller::QueueController.delete_message(task[:queue_name], task[:message]) rescue Exception => e retry_delete = task[:retry_delete].present? ? task[:retry_delete] : 0 if retry_delete < MAX_DELETE_RETRY task[:delete_retry] = retry_delete + 1 @completed_tasks.push task end end } end end end end