require_relative 'category' require_relative 'category_with_partitions' require_relative 'categories' require_relative 'categories/row' require_relative 'stream' require_relative 'streams' module Messaging module Adapters class Postgres # Message store adapter using Postgres and ActiveRecord. # Prefer accessing the message store through Messaging.message_store # instead of using it directly. # # @example Using Postgres as the default message store adapter: # # Put this in an initializer # Messaging.setup do |config| # config.message_store.adapter = :postgres # end # # @see Messaging.message_store class Store # @return [Streams] all the streams in the store # @see Streams attr_reader :streams # Should not be used directly. Access the store though # Messaging.message_store or Messaging::Adapters::Store[:postgres] # @api private def initialize @streams = Streams.new end # Get a specific stream by name # @return [Stream] # @see Messaging.stream def stream(name) streams[name] end # @return [Categories] all the stream categories in the store # @see Categories def categories Categories.new end # Get a specific category by name # @return [Category] # @see Messaging.category def category(name) categories[name] end # Access to all messages. # Use with caution in production as there are probably # a lot of messages so queries could take a long time or timeout. # # @example Check that a message has been added to the store with Rspec # expect do # # Your code that should add a message to the store # end.to change { Messaging.message_store.messages.count }.from(0).to(1) # # @return [SerializedMessage] def messages SerializedMessage end # Access to all messages in the given streams # # @param streams [Array] List of one or more streams to get messages from # @return [ActiveRecord::Relation] # @see Messaging.messages_in_streams def messages_in_streams(*streams) SerializedMessage.where(stream: streams.flatten.map(&:to_s)).order(:id) end # Writes the message to Postgres # Skips messages that hasn't defined a stream name # We do this to begin with so PG is opt-in per message # # @param message [Messaging::Message] The message to persist # @return [Messaging::Message] A new copy of the message with # the stream position added (if persisted) def call(message) return message unless message.stream_name SerializedMessage.create!(message: message).to_message rescue ActiveRecord::StatementInvalid => e category = message.category raise e unless e.message.include?('no partition of relation') raise e unless category || category.is_a?(CategoryWithPartitions) category.add_partition(date: Date.today) retry end end end end end