Sha256: d5485f0761560189ca82ddad30f61fb814ac055a912af58deade065538768790
Contents?: true
Size: 1.84 KB
Versions: 6
Compression:
Stored size: 1.84 KB
Contents
module Pwwka # Queue messages for sending in a batch # Primarily used when multiple messages need to sent from within a # transaction block # # Example: # # # instantiate a message_queuer object # message_queuer = MessageQueuerService.new # ActiveRecord::Base.transaction do # # do a thing, then queue message # message_queuer.queue_message(payload: {this: 'that'}, routing_key: 'go.to.there') # # # do another thing, then queue a delayed message # message_queuer.queue_message(payload: {the: 'other'}, routing_key: 'go.somewhere.else', delayed: true, delay_by: 3000) # end # # send the queued messages if we make it out of the transaction alive # message_queuer.send_messages_safely class MessageQueuer include Handling attr_reader :message_queue def initialize() @message_queue = [] end def queue_message(payload:, routing_key:, delayed: false, delay_by: nil) message_queue.push({ payload: payload, routing_key: routing_key, delayed: delayed, delay_by: delay_by }) end def send_messages_safely message_queue.each do |message| delay_hash = {delayed: message[:delayed], delay_by: message[:delay_by]}.delete_if{|_,v|!v} send_message_safely(*message_arguments(message)) end clear_messages end def send_messages! message_queue.each do |message| send_message!(*message_arguments(message)) end clear_messages end def clear_messages @message_queue.clear end private def message_arguments(message) delay_hash = {delayed: message[:delayed], delay_by: message[:delay_by]}.delete_if{|_,v|!v} [message[:payload], message[:routing_key], (delay_hash.any? ? delay_hash : nil)].compact end end end
Version data entries
6 entries across 6 versions & 1 rubygems