# frozen_string_literal: true require_relative '../concerns/base' module HermesMessengerOfTheGods module Endpoints class Base extend ActiveModel::Callbacks include HermesMessengerOfTheGods::Concerns::Base DEFAULT_OPTIONS = { retries: 3, jitter: true, backoff: :linear, jsonify: true }.freeze DEFAULT_RETRYS = { linear: ->(n) { n }, # Wait 1 sec, then two secondns, then three... exponential: ->(n) { (2**n) / (n + 1).to_f } # Wait 1, then 1.3, then 2 sec }.freeze attr_accessor :options, :endpoint, :errors, :result, :name define_model_callbacks :dispatch around_dispatch { |_, blk| instrument(:dispatch, &blk) } def initialize(name, endpoint, options = {}) self.name = name self.options = self.class::DEFAULT_OPTIONS.merge(options) self.endpoint = endpoint self.errors = [] end def dispatch(*args) dispatch!(*args) rescue StandardError false end def dispatch!(message, options = {}) run_callbacks :dispatch do retry_number = 0 begin self.result = transmit(transform_message(message), message, options) rescue StandardError => e errors << e retry_number += 1 if retry_number < max_retries && retry_from(e) instrument(:dispatch_failure, try: retry_number, exception: e) backoff(retry_number) retry else instrument(:final_failure, try: retry_number, exception: e) raise end end end true end def backoff(retry_number) return unless options[:backoff] backoff_arg = if options[:backoff].respond_to?(:call) options[:backoff] else DEFAULT_RETRYS[options[:backoff]] end sleep_time = backoff_arg.call(retry_number) if sleep_time sleep(sleep_time * (options[:jitter] ? rand(0.3..1) : 1)) end end def transform_message(message) transformer_name = "to_#{self.class.to_s.demodulize.underscore}_message" if options[:transformer] options[:transformer].respond_to?(:call) ? options[:transformer].call(message) : message.send(options[:transformer]) elsif message.respond_to?(transformer_name) message.public_send(transformer_name) elsif message.respond_to?(:_build_for_transmission) message._build_for_transmission else message end end def retry_from(exception) !exception.is_a?(HermesMessengerOfTheGods::Endpoints::FatalError) end def max_retries options[:retries] || 0 end def teardown; end def handle_success(_job); end def handle_failure(_job, e); end def fetch_option(option, *args, &blk) if options[option].respond_to?(:call) options[option].call(*args, &blk) else options[option] end || {} end end end end