module Pwwka # Primary interface for sending messages. # # Example: # # # Send a message, blowing up if there's any problem # Pwwka::Transmitter.send_message!({ user_id: @user.id }, "users.user.activated") # # # Send a message, logging if there's any problem # Pwwka::Transmitter.send_message_safely({ user_id: @user.id }, "users.user.activated") class Transmitter extend Pwwka::Logging include Pwwka::Logging attr_reader :channel_connector def initialize @channel_connector = ChannelConnector.new end # Send an important message that must go through. This method allows any raised exception # to pass through. # # payload:: Hash of what you'd like to include in your message # routing_key:: String routing key for the message # delayed:: Boolean send this message later # delay_by:: Integer milliseconds to delay the message # # Returns true # # Raises any exception generated by the innerworkings of this library. def self.send_message!(payload, routing_key, delayed: false, delay_by: nil) if delayed new.send_delayed_message!(*[payload, routing_key, delay_by].compact) else new.send_message!(payload, routing_key) end info "AFTER Transmitting Message on #{routing_key} -> #{payload}" end # Send a less important message that doesn't have to go through. This eats # any `StandardError` and logs it, returning false rather than blowing up. # # payload:: Hash of what you'd like to include in your message # routing_key:: String routing key for the message # delayed:: Boolean send this message later # delay_by:: Integer milliseconds to delay the message # # Returns true if the message was sent, false otherwise def self.send_message_safely(payload, routing_key, delayed: false, delay_by: nil) send_message!(payload, routing_key, delayed: delayed, delay_by: delay_by) rescue => e error "ERROR Transmitting Message on #{routing_key} -> #{payload}: #{e}" false end def send_message!(payload, routing_key) info "START Transmitting Message on #{routing_key} -> #{payload}" channel_connector.topic_exchange.publish( payload.to_json, routing_key: routing_key, persistent: true) channel_connector.connection_close # if it gets this far it has succeeded info "END Transmitting Message on #{routing_key} -> #{payload}" true end def send_delayed_message!(payload, routing_key, delay_by = 5000) channel_connector.raise_if_delayed_not_allowed info "START Transmitting Delayed Message on #{routing_key} -> #{payload}" channel_connector.create_delayed_queue channel_connector.delayed_exchange.publish( payload.to_json, routing_key: routing_key, expiration: delay_by, persistent: true) channel_connector.connection_close # if it gets this far it has succeeded info "END Transmitting Delayed Message on #{routing_key} -> #{payload}" true end end end