require 'active_record' require_relative 'postgres/serialized_message' require_relative 'postgres/advisory_transaction_lock' require_relative 'postgres/store' module Messaging module Adapters # Adapter for using Postgres and Active Record as a message store. # @see Messaging::Adapters::Postgres::Store Store - for more information on how to use the message store # capabilities provided by this adapter. class Postgres def self.register! return if Adapters.key? :postgres Adapters.register(:postgres, memoize: true) { Postgres.new } Adapters::Store.register(:postgres, memoize: true) { Store.new } end private_class_method :register! register! def create_schema connection.execute 'CREATE SCHEMA IF NOT EXISTS messaging' end def create_id_sequence connection.execute 'CREATE SEQUENCE IF NOT EXISTS messaging.messages_id_seq' end def create_messages_table sql = <<~SQL CREATE TABLE messaging.messages ( id bigint DEFAULT nextval('messaging.messages_id_seq'::regclass) NOT NULL, uuid uuid NOT NULL, stream character varying NOT NULL, stream_position bigint NOT NULL, message_type character varying NOT NULL, data jsonb, created_at timestamp without time zone NOT NULL, updated_at timestamp without time zone NOT NULL, stream_category character varying, stream_id character varying ) PARTITION BY LIST (stream_category); CREATE INDEX messages_id_idx ON ONLY messaging.messages USING btree (id); CREATE INDEX messages_stream_category_id_idx ON ONLY messaging.messages USING btree (stream_category, id); CREATE INDEX messages_stream_category_stream_id_stream_position_idx ON ONLY messaging.messages USING btree (stream_category, stream_id, stream_position); SQL connection.execute sql end def create_category(category_name) table_name = category_name.parameterize(separator: '_') sql = <<~SQL CREATE TABLE messaging.#{table_name} PARTITION OF messaging.messages FOR VALUES IN ('#{category_name}'); SQL connection.execute sql end def drop_category(category_name) table_name = category_name.parameterize(separator: '_') sql = <<~SQL drop TABLE messaging.#{table_name} SQL connection.execute sql end def connection ActiveRecord::Base.connection end end end end