# frozen_string_literal: true require 'concurrent' module Aws module ActiveJob module SQS # CLI runner for polling for SQS ActiveJobs class Executor DEFAULTS = { min_threads: 0, max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count), auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled }.freeze class << self def on_stop(&block) lifecycle_hooks[:stop] << block end def lifecycle_hooks @lifecycle_hooks ||= Hash.new { |h, k| h[k] = [] } end def clear_hooks @lifecycle_hooks = nil end end def initialize(options = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options)) @retry_standard_errors = options[:retry_standard_errors] @logger = options[:logger] || ActiveSupport::Logger.new($stdout) @task_complete = Concurrent::Event.new end def execute(message) post_task(message) rescue Concurrent::RejectedExecutionError # no capacity, wait for a task to complete @task_complete.reset @task_complete.wait retry end def shutdown(timeout = nil) run_hooks_for(:stop) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ 'finished cleanly. Unfinished jobs will not be removed from ' \ 'the queue and can be ru-run once their visibility timeout ' \ 'passes.' end end private def post_task(message) @executor.post(message) do |message| job = JobRunner.new(message) @logger.info("Running job: #{job.id}[#{job.class_name}]") job.run message.delete rescue Aws::Json::ParseError => e @logger.error "Unable to parse message body: #{message.data.body}. Error: #{e}." rescue StandardError => e job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job' @logger.info "Error processing job #{job_msg}: #{e}" @logger.debug e.backtrace.join("\n") if @retry_standard_errors && !job.exception_executions? @logger.info( 'retry_standard_errors is enabled and job has not ' \ "been retried by Rails. Leaving #{job_msg} in the queue." ) else message.delete end ensure @task_complete.set end end def run_hooks_for(event_name) return unless (hooks = self.class.lifecycle_hooks[event_name]) hooks.each(&:call) end end end end end