require 'active_record' require_relative 'postgres/serialized_message' require_relative 'postgres/consumer' require_relative 'postgres/advisory_transaction_lock' require_relative 'postgres/create_lock' require_relative 'postgres/release_lock' require_relative 'postgres/store' module Messaging module Adapters # Adapter for using Postgres and Active Record as a message store. # @see Messaging::Adapters::Postgres::Store Store - for more information on how to use the message store # capabilities provided by this adapter. class Postgres def self.register! return if Adapters.key? :postgres Adapters.register(:postgres, memoize: true) { Postgres.new } Adapters::Consumer.register(:postgres, memoize: true) { Adapters[:postgres] } Adapters::Store.register(:postgres, memoize: true) { Store.new } Adapters::Dispatcher.register(:postgres, memoize: true) { ->(_) { } } end private_class_method :register! register! def create_consumer(name, **options) Consumer.where(app: Messaging.config.app_name, name: name.to_s).first || Consumer.create(app: Messaging.config.app_name, name: name.to_s) end def create_messages_table sql = <<~SQL CREATE SCHEMA IF NOT EXISTS messaging; CREATE SEQUENCE IF NOT EXISTS messaging.messages_id_seq; CREATE TABLE messaging.messages ( id bigint DEFAULT nextval('messaging.messages_id_seq'::regclass) NOT NULL, transaction_id xid8 DEFAULT pg_current_xact_id() NOT NULL, uuid uuid NOT NULL, stream character varying NOT NULL, stream_position bigint NOT NULL, message_type character varying NOT NULL, data jsonb, created_at timestamp without time zone NOT NULL, updated_at timestamp without time zone NOT NULL, stream_category character varying, stream_id character varying ) PARTITION BY LIST (stream_category); CREATE INDEX messages_id_idx ON ONLY messaging.messages USING btree (id); CREATE INDEX messages_stream_category_id_idx ON ONLY messaging.messages USING btree (stream_category, id); CREATE INDEX messages_stream_category_stream_id_stream_position_idx ON ONLY messaging.messages USING btree (stream_category, stream_id, stream_position); SQL connection.execute sql end private def connection ActiveRecord::Base.connection end end end end