Sha256: d5485f0761560189ca82ddad30f61fb814ac055a912af58deade065538768790

Contents?: true

Size: 1.84 KB

Versions: 6

Compression:

Stored size: 1.84 KB

Contents

module Pwwka
  # Queue messages for sending in a batch
  # Primarily used when multiple messages need to sent from within a 
  # transaction block
  #
  # Example:
  #
  #     # instantiate a message_queuer object
  #     message_queuer  = MessageQueuerService.new
  #     ActiveRecord::Base.transaction do
  #       # do a thing, then queue message
  #       message_queuer.queue_message(payload: {this: 'that'}, routing_key: 'go.to.there')
  #
  #       # do another thing, then queue a delayed message
  #       message_queuer.queue_message(payload: {the: 'other'}, routing_key: 'go.somewhere.else', delayed: true, delay_by: 3000)
  #     end
  #     # send the queued messages if we make it out of the transaction alive
  #     message_queuer.send_messages_safely


  class MessageQueuer

    include Handling

    attr_reader :message_queue

    def initialize()
      @message_queue  = []
    end

    def queue_message(payload:, routing_key:, delayed: false, delay_by: nil)
      message_queue.push({
            payload: payload,
        routing_key: routing_key,
            delayed: delayed,
           delay_by: delay_by
      })
    end

    def send_messages_safely
      message_queue.each do |message|
        delay_hash  = {delayed: message[:delayed], delay_by: message[:delay_by]}.delete_if{|_,v|!v}
        send_message_safely(*message_arguments(message))
      end
      clear_messages
    end

    def send_messages!
      message_queue.each do |message|
        send_message!(*message_arguments(message))
      end
      clear_messages
    end

    def clear_messages
      @message_queue.clear
    end

    private
    def message_arguments(message)
      delay_hash  = {delayed: message[:delayed], delay_by: message[:delay_by]}.delete_if{|_,v|!v}
      [message[:payload], message[:routing_key], (delay_hash.any? ? delay_hash : nil)].compact
    end

  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
pwwka-0.4.2 lib/pwwka/message_queuer.rb
pwwka-0.4.1 lib/pwwka/message_queuer.rb
pwwka-0.4.0 lib/pwwka/message_queuer.rb
pwwka-0.3.2 lib/pwwka/message_queuer.rb
pwwka-0.3.1 lib/pwwka/message_queuer.rb
pwwka-0.3.0 lib/pwwka/message_queuer.rb