# frozen_string_literal: true require 'active_model' require 'active_support' require 'active_support/core_ext/hash' module HermesMessengerOfTheGods module Concerns module Message extend ActiveSupport::Concern include ActiveModel::Model include EndpointBuilder::Helpers include LoggingHelpers attr_accessor :original_message, :source_endpoint def validate! raise HermesMessengerOfTheGods::ValidationError.new(self) unless valid? end def dispatch dispatch! rescue StandardError false end def retry_at(future_time_or_seconds_in_future) raise 'unable to set visiblity' if source_endpoint.nil? || original_message.nil? raise "endpoint type doesn't support setting execution time" unless source_endpoint.respond_to?(:set_reexecution_time) source_endpoint.set_reexecution_time(original_message, future_time_or_seconds_in_future) end def dispatch!(endpoint_args: {}) validate! begin endpoint.dispatch!(self, endpoint_args) unless HermesMessengerOfTheGods.config.stub_dispatch rescue StandardError => e say_error(e) raise HermesMessengerOfTheGods::MessageDispatchFailed, e.message ensure endpoint.teardown end true end def _build_for_transmission to_message end def to_message attributes end def attributes (self.class._defined_attributes || []).each.with_object({}) do |attr, hsh| hsh[attr] = send(attr) end end def inspect "<#{self.class}: #{attributes}>" end included do class_attribute :_defined_attributes, :_endpoint, :_circuit_breaker_errors, :_max_consecutive_failures end class_methods do def send_messages(messages, options = {}) return if messages.empty? raise ArgumentError, "All messages must be #{self.class}" unless messages.all?{|m| m.is_a?(self) } messages.map(&:validate!) messages.first.endpoint.bulk_dispatch!(messages, options) end def from_message(attrs = {}, &blk) attrs = attrs.slice(*_defined_attributes.map(&:to_s)) new(attrs, &blk) end def attr_accessor(*args) self._defined_attributes ||= [] self._defined_attributes += args super end def endpoint _endpoint end def circuit_breaker_errors _circuit_breaker_errors end def max_consecutive_failures _max_consecutive_failures end def max_consecutive_failures=(val) raise 'Expected an Integer' unless val.is_a?(Integer) || val.is_a?(NilClass) self._max_consecutive_failures = val end def circuit_breaker_errors=(val) val ||= {} raise 'Expected a hash' unless val.is_a?(Hash) val.each do |err, _actions| val[err][:sleep] ||= 0 val[err][:fatal] ||= false end val.each do |_err, actions| raise ':sleep must be a number' unless actions[:sleep].is_a?(Numeric) raise ':fatal must be a boolean' unless [true, false].include?(actions[:fatal]) end self._circuit_breaker_errors = val end def endpoint=(val) self._endpoint = val end end end end end