# frozen_string_literal: true require 'aws-sdk-sqs' require 'json' require_relative './base' begin require 'k8s-ruby' rescue LoadError STDERR.puts 'k8s-ruby gem not found. Please install it to enable prefential scale-down of workers.' end module HermesMessengerOfTheGods module Endpoints class Sqs < Base VISIBILITY_EXTEND_DURATION = 120 VISIBILITY_EXTEND_FREQUENCY = 60 def self.k8s_client return unless defined?(K8s) @k8s_client ||= K8s::Client.in_cluster_config end def self.set_deletion_cost(cost) # If there is no k8s client or the cost is the same as the last time we set it, we don't need to do anything return unless k8s_client || @last_deletion_cost == cost k8s_client.api('v1').resource('pods', namespace: ENV['SYSTEM_NAMESPACE']).merge_patch(ENV['HOSTNAME'], { metadata: { annotations: { "controller.kubernetes.io/pod-deletion-cost" => cost.to_s }, } }) @last_deletion_cost = cost rescue StandardError => e STDERR.puts "Error setting deletion cost: #{e.message}" end def initialize(*args) super @message_mux = Monitor.new @work_start_time_mux = Monitor.new end def poller @poller ||= Aws::SQS::QueuePoller.new( endpoint, { client: HermesMessengerOfTheGods.configuration.sqs_client, }.merge(options[:client_options] || {}), ) end def work_start_time @work_start_time_mux.synchronize { @work_start_time } end def reset_work_start_time! @work_start_time_mux.synchronize { @work_start_time = Time.now } end def received_work_in_last_check? @work_start_time_mux.synchronize { @received_work_in_last_check || false } end def received_work_in_last_check=(val) @work_start_time_mux.synchronize { @received_work_in_last_check = val } end def inflight_messages @message_mux.synchronize { @inflight_messages ||= [] } end def inflight_messages=(val) @message_mux.synchronize { @inflight_messages = val } end def shutting_down? @shutdown || false end # Basic Shutdown behavior: # Allow in-progress message to finish working. # Reset visbility timeout to all un-executed messages (from current message to end of array) to 0 so they move # to other works # # Break from polling def shutdown! warn 'Reveived shutdown signal' @shutdown = true end def poll_options (options[:poll_options] || {}).merge(skip_delete: true) end def work_off(&blk) poller.before_request do |_stats| throw :stop_polling if shutting_down? self.class.set_deletion_cost(0) unless received_work_in_last_check? self.received_work_in_last_check = false end poller.poll(poll_options) do |messages, _stats| self.inflight_messages = messages = Array.wrap(messages) self.received_work_in_last_check = true self.class.set_deletion_cost(messages.size) working_messages do completion_results = messages.group_by do |msg| # We return false if we are shutting down so the messages are not deleted. # It is also possible that this process has already releases these SQS messages # back in to the queue so they may be picked up by another process. # # Work message returns true if the messager should be considered successful shutting_down? ? :shutdown : work_message(msg, &blk) end poller.delete_messages(completion_results[true] & inflight_messages) unless completion_results.fetch(true, []).empty? # Messages skipped due to shutdowns get their visibility set back to 0 so they restart # normal failed jobs will be left in queue until their visibility timeout expires to indicate a backoff set_message_visibility(completion_results[:shutdown] & inflight_messages, 0) unless completion_results.fetch(:shutdown, []).empty? end end end def set_reexecution_time(message, duration_or_time) # You can pass a time to re-run at or you can pass the seconds in the future to run at extend_time = if duration_or_time.is_a?(Time) duration_or_time - Time.now else duration_or_time end message_array = [message] @message_mux.synchronize do set_message_visibility(message_array, (Time.now - work_start_time) + extend_time) @inflight_messages -= message_array end end def work_message(message) message_body = decode_message(message) skip_delete = catch(:skip_delete) do yield(message_body, message) false end !skip_delete rescue StandardError => e false end def sqs_client HermesMessengerOfTheGods.configuration.sqs_client end def queue @queue ||= Aws::SQS::Queue.new( endpoint, { client: HermesMessengerOfTheGods.configuration.sqs_client, }.merge(options[:client_options] || {}), ) end def queue_data queue.reload.data end def has_pending_work? data = queue_data.attributes approximate_pending_messages = data['ApproximateNumberOfMessages'].to_i - data['ApproximateNumberOfMessagesNotVisible'].to_i - data['ApproximateNumberOfMessagesDelayed'].to_i # Just in case the math is off approximate_pending_messages.positive? end def to_transmit_payload(message, raw_message, dispatch_options = {}) send_opts = fetch_option(:send_options, raw_message) || {} message = JSON.dump(message) if options[:jsonify] send_opts.merge(dispatch_options, message_body: message) end def transmit(payload) bulk_transmit([payload]) end def bulk_transmit(payloads) all_entries = payloads.map! do |payload| {id: SecureRandom.uuid}.merge(payload) end failed_msgs = [] all_entries.each_slice(10) do |entries| resp = sqs_client.send_message_batch(queue_url: endpoint, entries: entries) failed_msgs.concat(resp.failed) end if failed_msgs.any? all_sender_fault = failed_msgs.failed.all?(&:sender_fault) raise FatalError, "Error in dispatching: #{failed_msgs[0].message}" if all_sender_fault raise "Some messages failed to send due to recoverable error #{failed_msgs[0].message}" end true end private def decode_message(message) message_body = JSON.parse(message.body) message_body = JSON.parse(message_body['Message']) if options[:from_sns] message_body end def working_messages reset_work_start_time! thread = start_visibility_update_thread yield ensure thread&.terminate end def start_visibility_update_thread Thread.new do loop do new_time = (Time.now - work_start_time) + VISIBILITY_EXTEND_DURATION set_message_visibility(inflight_messages, new_time) sleep VISIBILITY_EXTEND_FREQUENCY rescue StandardError => e say_warn 'Error received trying to extend visibility' say_warn e.message raise end end end def set_message_visibility(messages, new_time) payload = messages.collect do |message| { id: SecureRandom.uuid, receipt_handle: message.receipt_handle, visibility_timeout: new_time, } end queue.change_message_visibility_batch(entries: payload) end end end end