# frozen_string_literal: true module Phobos class Executor include Phobos::Instrumentation include Phobos::Log def initialize @threads = Concurrent::Array.new @listeners = Phobos.config.listeners.flat_map do |config| handler_class = config.handler.constantize listener_configs = config.to_hash.deep_symbolize_keys max_concurrency = listener_configs[:max_concurrency] || 1 Array.new(max_concurrency).map do configs = listener_configs.select { |k| Constants::LISTENER_OPTS.include?(k) } Phobos::Listener.new(**configs.merge(handler: handler_class)) end end end def start @signal_to_stop = false @threads.clear @thread_pool = Concurrent::FixedThreadPool.new(@listeners.size) @listeners.each do |listener| @thread_pool.post do thread = Thread.current thread.abort_on_exception = true @threads << thread run_listener(listener) end end true end def stop return if @signal_to_stop instrument('executor.stop') do @signal_to_stop = true @listeners.each(&:stop) @threads.select(&:alive?).each do |thread| begin thread.wakeup rescue StandardError nil end end @thread_pool&.shutdown @thread_pool&.wait_for_termination Phobos.logger.info { Hash(message: 'Executor stopped') } end end private def error_metadata(exception) { exception_class: exception.class.name, exception_message: exception.message, backtrace: exception.backtrace } end # rubocop:disable Lint/RescueException def run_listener(listener) retry_count = 0 begin listener.start rescue Exception => e handle_crashed_listener(listener, e, retry_count) retry_count += 1 retry unless @signal_to_stop end rescue Exception => e log_error("Failed to run listener (#{e.message})", error_metadata(e)) raise e end # rubocop:enable Lint/RescueException # When "listener#start" is interrupted it's safe to assume that the consumer # and the kafka client were properly stopped, it's safe to call start # again def handle_crashed_listener(listener, error, retry_count) backoff = listener.create_exponential_backoff interval = backoff.interval_at(retry_count).round(2) metadata = { listener_id: listener.id, retry_count: retry_count, waiting_time: interval }.merge(error_metadata(error)) instrument('executor.retry_listener_error', metadata) do log_error("Listener crashed, waiting #{interval}s (#{error.message})", metadata) sleep interval end end end end