require "logger" module Outboxer module Publisher module_function @publishing = false @logger = Logger.new($stdout) Args = Struct.new(:message, :logger) # This method initiates the publishing process. It dequeues the messages one by one # and yields to a block that should contain the publishing logic. # # @param [Integer] poll # Sleep time in seconds between polling the queue when it's empty. # # @param [Proc] backoff # A Proc that takes the current backoff time and returns the new backoff time. # # @yieldparam [Args] args # An Args object containing the message to publish and a logger. def publish(poll: 1, backoff: ->(current_backoff) { [current_backoff * 2, 5 * 60].min }) @publishing = true ActiveRecord::Base.connection_pool.with_connection do while @publishing outboxer_message = dequeue(backoff: backoff) if outboxer_message.nil? sleep poll next end begin yield Args.new(outboxer_message.message, @logger) rescue StandardError => exception failed( outboxer_message: outboxer_message, backoff: backoff, exception: exception) next end published( outboxer_message: outboxer_message, backoff: backoff) end end end def dequeue(backoff:) retry_on_error(backoff: backoff) do ActiveRecord::Base.transaction do message = Models::Message .where(status: Models::Message::STATUS[:unpublished]) .order(created_at: :asc) .limit(1) .lock("FOR UPDATE SKIP LOCKED") .first message&.update!(status: Models::Message::STATUS[:publishing]) message end end end def published(outboxer_message:, backoff:) retry_on_error(backoff: backoff) do outboxer_message.destroy! end end def failed(outboxer_message:, exception:, backoff:) @logger.error( "Exception raised: #{exception.class}: #{exception.message}\n" \ "#{exception.backtrace.join("\n")}") retry_on_error(backoff: backoff) do ActiveRecord::Base.transaction do outboxer_message.update!(status: Models::Message::STATUS[:failed]) outboxer_message.exceptions.create!( class_name: exception.class.name, message_text: exception.message, backtrace: exception.backtrace) end end end def retry_on_error(backoff:, &block) current_backoff = 1 begin block.call rescue StandardError => exception @logger.fatal( "Exception raised: #{exception.class}: #{exception.message}\n" \ "#{exception.backtrace.join("\n")}") raise exception unless @publishing sleep current_backoff current_backoff = backoff.call(current_backoff) retry end end # Stops the publishing process. # # @note This method will stop the current message publishing process # It is a safe way to interrupt the publishing process at any point. # # @return [void] def stop @publishing = false end Signal.trap("TERM") do @logger.info("Received SIGTERM, stopping...") stop end private_class_method :retry_on_error, :dequeue, :published, :failed end end