module Messaging module Adapters class Postgres class Consumer < ActiveRecord::Base include Messaging::Routing self.table_name = 'messaging.consumers' def self.latest_known_transaction_id connection.select_value('select pg_snapshot_xmin(pg_current_snapshot())') end def start obtain_lock refresh_latest_processed_transaction_id Messaging.logger.info "[#{name}] Consumer started" @running = true process_messages ensure shutdown end def stop @running = false save Messaging.logger.info "[#{name}] Consumer stopping" end def shutdown Messaging.logger.info "[#{name}] Consumer stopped" release_lock end private def categories @categories ||= routes.flat_map(&:categories).map(&:to_s).uniq end def refresh_latest_processed_transaction_id reload return if last_processed_transaction_id.present? self.last_processed_transaction_id = self.class.latest_known_transaction_id save end def process_messages while @running do Meter.histogram('messaging.consumer.lag', unprocessed_messages_count, tags: { consumer: name }) messages = fetch_messages messages.each do |message| process_message(message) end save if messages.any? pause_polling_for_5_s unless messages.any? end end def process_message(message) Middleware.run(Config.consumer.middlewares, message) { handle message } self.last_processed_position = message.global_position self.last_processed_transaction_id = message.transaction_id end def pause_polling_for_5_s Messaging.logger.debug "[#{name}] No new messages. Sleeping" sleep 5 end def unprocessed_messages_count fetch_messages.limit(nil).count end def fetch_messages SerializedMessage.in_category(categories) .with_transaction_id_lower_than_any_currently_running_transaction .newer_than(last_processed_transaction_id, last_processed_position) .order(:id) .limit(1000) end def obtain_lock Messaging::Adapters::Postgres::CreateLock.(key: "#{app}-consumer-#{name}") end def release_lock Messaging::Adapters::Postgres::ReleaseLock.(key: "#{app}-consumer-#{name}") end end end end end