require 'active_support' require 'active_support/concern' require 'active_support/core_ext/class' require 'active_support/core_ext/class/attribute' require 'active_support/core_ext/time' require 'dry-equalizer' module Messaging module Message extend ActiveSupport::Concern included do include Virtus.model include Dry::Equalizer(:attributes) attr_accessor :stream_position attr_accessor :expected_version key_attribute :uuid attribute :message_name, String, default: ->(message, _) { message.class.message_name } attribute :timestamp, Time, default: ->(*) { Time.current.utc } attribute :uuid, String, default: ->(*) { SecureRandom.uuid } end module ClassMethods # By default the topic is the same as the name of the message. # We change the / that would be set for a namespaced message as "/" isn't valid in a topic # To change the topic for a message just set it to whatever you want in your class definition. def topic(topic_name = nil) @topic ||= topic_name&.to_s || default_topic_name end # The attribute which value should be used as the key of the message. # Must specify an attribute if ordering is important. def key_attribute(attribute = nil) @key_attribute = attribute if attribute @key_attribute end def default_topic_name return superclass.topic if superclass.respond_to?(:topic) message_name.gsub('/', '-') end # Shorcut for creating a new message and publishing it # # @param attributes [Hash] The attributes of the message # @option attributes [:any, Integer] :expected_version Concurrency control # @return [Message] the message just published # @raise [ExpectedVersion::Error] If the expected_version does not match def publish(attributes) new(attributes).publish end def message_name name.underscore end def message_type to_s end end # We do our own conversion for datetimes as by default they will only # be stored with microseconds, which makes us loose precision when we # serialize to json and back to message objects def attributes_as_json(*_args) attributes.transform_values { |v| v.respond_to?(:nsec) ? v.iso8601(9) : v } end def as_json(*_args) attributes_as_json.merge(stream_position: stream_position) end def message_key attributes[key_attribute&.to_sym] end def key_attribute self.class.key_attribute end def topic self.class.topic end def stream_name # define stream_name in your message class to override nil end def message_type self.class.to_s end def publish Messaging::Publish.(message: self) end end end