module Messaging module Adapters class Postgres class Consumer < ActiveRecord::Base include Messaging::Routing self.table_name = 'messaging.consumers' def self.latest_known_transaction_id connection.select_value('select pg_snapshot_xmin(pg_current_snapshot())') end def start obtain_lock refresh_latest_processed_transaction_id Messaging.logger.info "[#{name}] Consumer started" @running = true process_messages ensure shutdown end def stop @running = false Messaging.logger.info "[#{name}] Consumer stopping" end def shutdown Messaging.logger.info "[#{name}] Consumer stopped" release_lock end private def categories @categories ||= routes.flat_map(&:categories).map(&:to_s).uniq end def refresh_latest_processed_transaction_id reload return if last_processed_transaction_id.present? self.last_processed_transaction_id = self.class.latest_known_transaction_id save_position rescue ActiveRecord::RecordNotFound => e recreate_consumer_record retry end def recreate_consumer_record self.id = nil @new_record = true @attributes = @attributes.except('id') if ::Rails::VERSION::MAJOR == 2 save end def process_messages while @running do Meter.histogram('messaging.consumer.lag', unprocessed_messages_count, tags: { consumer: name }) messages = fetch_messages messages.each do |message| break unless process_message(message) end save_position pause_polling_for_5_s unless messages.any? end ensure save_position end def save_position return unless last_processed_position_changed? || last_processed_transaction_id_changed? recreate_consumer_record unless self.class.exists? id save end def process_message(message) Middleware.run(Config.consumer.middlewares, message) { handle message } self.last_processed_position = message.global_position self.last_processed_transaction_id = message.transaction_id true rescue StandardError => e ExceptionHandler.call(e, { consumer: name, transaction_id: message.transaction_id, global_position: message.global_position, message: message.inspect } ) pause_to_avoid_creating_exception_storm false end def pause_to_avoid_creating_exception_storm sleep 10 end def pause_polling_for_5_s Messaging.logger.debug "[#{name}] No new messages. Sleeping" sleep 5 end def unprocessed_messages_count fetch_messages.limit(nil).count end def fetch_messages SerializedMessage.in_category(categories) .with_transaction_id_lower_than_any_currently_running_transaction .newer_than(last_processed_transaction_id, last_processed_position) .order(:id) .limit(1000) end def obtain_lock Messaging::Adapters::Postgres::CreateLock.(key: "#{app}-consumer-#{name}") end def release_lock Messaging::Adapters::Postgres::ReleaseLock.(key: "#{app}-consumer-#{name}") end end end end end