# frozen_string_literal: true require 'aws-sdk-sqs' require 'json' require_relative './base' module HermesMessengerOfTheGods module Endpoints class Sqs < Base VISIBILITY_EXTEND_DURATION = 120 VISIBILITY_EXTEND_FREQUENCY = 60 def poller @poller ||= Aws::SQS::QueuePoller.new(endpoint) end def poll_options (options[:poll_options] || {}).merge(skip_delete: true) end def work_off(&blk) poller.poll(poll_options) do |messages, _stats| messages = Array.wrap(messages) working_messages(messages) do completed = messages.select { |msg| work_message(msg, &blk) } poller.delete_messages(completed) unless completed.empty? end end end def work_message(message) message_body = decode_message(message) skip_delete = catch(:skip_delete) do yield(message_body, message) false end !skip_delete rescue StandardError => e instrument(:read_failure, exception: e) false end def queue @queue ||= Aws::SQS::Queue.new(endpoint, options[:client_options] || {}) end def queue_data queue.reload.data end def has_pending_work? data = queue_data.attributes approximate_pending_messages = data["ApproximateNumberOfMessages"].to_i - data["ApproximateNumberOfMessagesNotVisible"].to_i - data["ApproximateNumberOfMessagesDelayed"].to_i # Just in case the math is off approximate_pending_messages > 0 end def transmit(message, raw_message, dispatch_options = {}) send_opts = fetch_option(:send_options, raw_message) || {} message = JSON.dump(message) if options[:jsonify] queue.send_message(send_opts.merge(dispatch_options, message_body: message)) end private def decode_message(message) message_body = JSON.parse(message.body) message_body = JSON.parse(message_body['Message']) if options[:from_sns] message_body end def working_messages(messages) thread = start_visibility_update_thread(messages) yield ensure thread&.terminate end def start_visibility_update_thread(messages) start_work_time = Time.now Thread.new do loop do new_time = (Time.now - start_work_time) + VISIBILITY_EXTEND_DURATION queue.change_message_visibility_batch( entries: messages.collect do |message| { id: SecureRandom.uuid, receipt_handle: message.receipt_handle, visibility_timeout: new_time } end ) sleep VISIBILITY_EXTEND_FREQUENCY rescue StandardError => e STDERR.puts 'Error received trying to extend visibility' STDERR.puts e.message raise end end end end end end