# frozen_string_literal: true require 'aws-sdk-sqs' require 'json' require_relative './base' module HermesMessengerOfTheGods module Endpoints class Sqs < Base VISIBILITY_EXTEND_DURATION = 120 VISIBILITY_EXTEND_FREQUENCY = 60 def initialize(*args) super @message_mux = Monitor.new end def poller @poller ||= Aws::SQS::QueuePoller.new( endpoint, { client: HermesMessengerOfTheGods.configuration.sqs_client }.merge(options[:client_options] || {}) ) end def inflight_messages @message_mux.synchronize { @inflight_messages ||= [] } end def inflight_messages=(val) @message_mux.synchronize { @inflight_messages = val } end def shutting_down? @shutdown || false end # Basic Shutdown behavior: # Allow in-progress message to finish working. # Reset visbility timeout to all un-executed messages (from current message to end of array) to 0 so they move # to other works # # Break from polling def shutdown! warn 'Reveived shutdown signal' @shutdown = true end def poll_options (options[:poll_options] || {}).merge(skip_delete: true) end def work_off(&blk) poller.before_request { |_stats| throw :stop_polling if shutting_down? } poller.poll(poll_options) do |messages, _stats| self.inflight_messages = messages = Array.wrap(messages) working_messages do completion_results = messages.group_by do |msg| # We return false if we are shutting down so the messages are not deleted. # It is also possible that this process has already releases these SQS messages # back in to the queue so they may be picked up by another process. # # Work message returns true if the messager should be considered successful shutting_down? ? :shutdown : work_message(msg, &blk) end poller.delete_messages(completion_results[true]) unless completion_results.fetch(true, []).empty? # Messages skipped due to shutdowns get their visibility set back to 0 so they restart # normal failed jobs will be left in queue until their visibility timeout expires to indicate a backoff set_message_visibility(completion_results[:shutdown], 0) unless completion_results.fetch(:shutdown, []).empty? end end end def work_message(message) message_body = decode_message(message) skip_delete = catch(:skip_delete) do yield(message_body, message) false end !skip_delete rescue StandardError => e instrument(:read_failure, exception: e) false end def queue @queue ||= Aws::SQS::Queue.new( endpoint, { client: HermesMessengerOfTheGods.configuration.sqs_client }.merge(options[:client_options] || {}) ) end def queue_data queue.reload.data end def has_pending_work? data = queue_data.attributes approximate_pending_messages = data['ApproximateNumberOfMessages'].to_i - data['ApproximateNumberOfMessagesNotVisible'].to_i - data['ApproximateNumberOfMessagesDelayed'].to_i # Just in case the math is off approximate_pending_messages > 0 end def transmit(message, raw_message, dispatch_options = {}) send_opts = fetch_option(:send_options, raw_message) || {} message = JSON.dump(message) if options[:jsonify] queue.send_message(send_opts.merge(dispatch_options, message_body: message)) end private def decode_message(message) message_body = JSON.parse(message.body) message_body = JSON.parse(message_body['Message']) if options[:from_sns] message_body end def working_messages thread = start_visibility_update_thread yield ensure thread&.terminate end def start_visibility_update_thread start_work_time = Time.now Thread.new do loop do new_time = (Time.now - start_work_time) + VISIBILITY_EXTEND_DURATION set_message_visibility(inflight_messages, new_time) sleep VISIBILITY_EXTEND_FREQUENCY rescue StandardError => e say_warn 'Error received trying to extend visibility' say_warn e.message raise end end end def set_message_visibility(messages, new_time) queue.change_message_visibility_batch( entries: messages.collect do |message| { id: SecureRandom.uuid, receipt_handle: message.receipt_handle, visibility_timeout: new_time } end ) end end end end