# frozen_string_literal: true require 'active_model' require 'active_support' require 'active_support/core_ext/hash' require 'hermes_messenger_of_the_gods/concerns/base' module HermesMessengerOfTheGods module Concerns module Message extend ActiveSupport::Concern include ActiveModel::Model include HermesMessengerOfTheGods::Concerns::Base attr_accessor :targeted_endpoints def original_message @original_message end def initialize(options = {}) @monitor = Monitor.new self.targeted_endpoints = options.delete(:endpoints) super end def validate! raise HermesMessengerOfTheGods::ValidationError, self unless valid? end def dispatch dispatch! rescue StandardError false end def dispatch_async HermesMessengerOfTheGods.increment_async_dispatches_in_progress Thread.new do dispatch ensure HermesMessengerOfTheGods.decrement_async_dispatches_in_progress end end def dispatch_async! HermesMessengerOfTheGods.increment_async_dispatches_in_progress Thread.new do dispatch! ensure HermesMessengerOfTheGods.decrement_async_dispatches_in_progress end end def dispatch!(endpoint_args: {}) run_callbacks :dispatch do validate! endpoints.collect do |ep_name, endpoint| next if targeted_endpoints && !targeted_endpoints.include?(ep_name) begin endpoint.dispatch!(self, endpoint_args) unless HermesMessengerOfTheGods.config.stub_dispatch register_success(ep_name, endpoint.result) rescue StandardError => e say_error(e) register_failure(ep_name, e) ensure endpoint.teardown end end unless dispatch_errors.empty? klass = if successes.empty? HermesMessengerOfTheGods::MessageDispatchTotalFailure else HermesMessengerOfTheGods::MessageDispatchPartialFailure end ex = klass.new ex.exceptions = dispatch_errors raise ex end end true end def register_failure(ep_name, error) @monitor.synchronize do instrument(:dispatch_failure, endpoint_name: ep_name, exception: error) dispatch_errors[ep_name] = error end end def register_success(ep_name, return_value) @monitor.synchronize do successes[ep_name] = return_value end end def dispatch_errors @dispatch_errors ||= {} end def successes @successes ||= {} end def _build_for_transmission to_message end def to_message attributes end def attributes (self.class._defined_attributes || []).each.with_object({}) do |attr, hsh| hsh[attr] = send(attr) end end def inspect "<#{self.class}: #{attributes}>" end included do class_attribute :_defined_attributes, :_endpoints, :_circuit_breaker_errors, :_max_consecutive_failures define_model_callbacks :dispatch around_dispatch { |_, blk| instrument(:dispatch, &blk) } end class_methods do def from_message(attrs = {}, &blk) attrs = attrs.slice(*_defined_attributes.map(&:to_s)) new(attrs, &blk) end def attr_accessor(*args) self._defined_attributes ||= [] self._defined_attributes += args super end def endpoints _endpoints end def circuit_breaker_errors _circuit_breaker_errors end def max_consecutive_failures _max_consecutive_failures end def max_consecutive_failures=(val) raise 'Expected an Integer' unless val.is_a?(Integer) || val.is_a?(NilClass) self._max_consecutive_failures = val end def circuit_breaker_errors=(val) val ||= {} raise 'Expected a hash' unless val.is_a?(Hash) val.each do |err, actions| val[err][:sleep] ||= 0 val[err][:fatal] ||= false end val.each do |err, actions| raise ':sleep must be a number' unless actions[:sleep].is_a?(Numeric) raise ':fatal must be a boolean' unless [true, false].include?(actions[:fatal]) end self._circuit_breaker_errors = val end def endpoints=(val) unless val.is_a?(Hash) raise 'Endpoints expects a hash {endpoint_name: endpoint_handler}' end self._endpoints = val end end end end end