module Messaging module Adapters class Postgres class CategoryWithPartitions < Category def add_partition(date:) from, to = partition_range_for(date: date) partition_name = partition_name_for(date: from) return if partition_exists?(partition_name) sql = <<~SQL CREATE TABLE IF NOT EXISTS messaging.#{partition_name} PARTITION OF messaging.#{table_name} FOR VALUES FROM ('#{from}') TO ('#{to}') SQL SerializedMessage.transaction do AdvisoryTransactionLock.call key: partition_name SerializedMessage.connection.execute sql end end # Creates multiple partitions # # @param start_date [Date] from which date to create partitions # @param days [Integer] how many days worth of partitions to create def add_partitions(start_date:, days: 1) first = start_date.to_date last = first + (days - 1) (first..last).each do |date| add_partition(date: date) end end # Removes a partition including the included messages # # @param partition_name [String] the name of the partition to drop def drop_partition(partition_name) return unless partition_name.match?(/^#{table_name}_\d{4}_\d{2}_\d{2}$/) SerializedMessage.connection.execute "drop TABLE messaging.#{partition_name}" end # Removes all partitions older than the given date # # @param date [Date] the cutoff date def drop_partitions_older_than(date) max_partition_name = partition_name_for(date: date) partitions.select { |p| p < max_partition_name }.each do |p| drop_partition(p) end end def partition_name_for(date:) "#{name}_%d_%02d_%02d" % [date.year, date.month, date.day] end def partition_range_for(date:) from = date.to_time.beginning_of_day to = from + 1.day [from, to] end def partition_exists?(partition_name) partitions.include? partition_name end def partitions sql = <<~SQL SELECT child.relname AS name FROM pg_inherits JOIN pg_class parent ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace ON pg_namespace.oid = parent.relnamespace WHERE pg_namespace.nspname = 'messaging' AND parent.relname = '#{table_name}' AND child.relkind = 'r' ORDER BY name SQL SerializedMessage.connection.select_values(sql) end def inspect "#" end end end end end