require 'resque' rescue nil # optional dependency module Pwwka # Primary interface for sending messages. # # Example: # # # Send a message, blowing up if there's any problem # Pwwka::Transmitter.send_message!({ user_id: @user.id }, "users.user.activated") # # # Send a message, logging if there's any problem # Pwwka::Transmitter.send_message_safely({ user_id: @user.id }, "users.user.activated") class Transmitter extend Pwwka::Logging include Pwwka::Logging DEFAULT_DELAY_BY_MS = 5000 attr_reader :channel_connector def initialize @channel_connector = ChannelConnector.new end # Send an important message that must go through. This method allows any raised exception # to pass through. # # payload:: Hash of what you'd like to include in your message # routing_key:: String routing key for the message # delayed:: Boolean send this message later # delay_by:: Integer milliseconds to delay the message # on_error:: What is the behavior of # - :ignore (aka as send_message_safely) # - :raise # - :resque -- use Resque to try to send the message later # # Returns true # # Raises any exception generated by the innerworkings of this library. def self.send_message!(payload, routing_key, on_error: :raise, delayed: false, delay_by: nil) if delayed new.send_delayed_message!(*[payload, routing_key, delay_by].compact) else new.send_message!(payload, routing_key) end info "AFTER Transmitting Message on #{routing_key} -> #{payload}" rescue => e error "ERROR Transmitting Message on #{routing_key} -> #{payload}: #{e}" case on_error when :raise raise e when :resque begin send_message_async(payload, routing_key, delay_by_ms: delayed ? delay_by || DEFAULT_DELAY_BY_MS : 0) rescue => resque_exception warn(resque_exception.message) raise e end else # ignore end false end # Use Resque to enqueue the message. # - :delay_by_ms:: Integer milliseconds to delay the message. Default is 0. def self.send_message_async(payload, routing_key, delay_by_ms: 0) Resque.enqueue_in(delay_by_ms/1000, SendMessageAsyncJob, payload, routing_key) end # Send a less important message that doesn't have to go through. This eats # any `StandardError` and logs it, returning false rather than blowing up. # # payload:: Hash of what you'd like to include in your message # routing_key:: String routing key for the message # delayed:: Boolean send this message later # delay_by:: Integer milliseconds to delay the message # # Returns true if the message was sent, false otherwise # @deprecated This is ignoring a message. ::send_message supports this explicitly. def self.send_message_safely(payload, routing_key, delayed: false, delay_by: nil) send_message!(payload, routing_key, delayed: delayed, delay_by: delay_by, on_error: :ignore) end def send_message!(payload, routing_key) info "START Transmitting Message on #{routing_key} -> #{payload}" channel_connector.topic_exchange.publish( payload.to_json, routing_key: routing_key, persistent: true) channel_connector.connection_close # if it gets this far it has succeeded info "END Transmitting Message on #{routing_key} -> #{payload}" true end def send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS) channel_connector.raise_if_delayed_not_allowed info "START Transmitting Delayed Message on #{routing_key} -> #{payload}" channel_connector.create_delayed_queue channel_connector.delayed_exchange.publish( payload.to_json, routing_key: routing_key, expiration: delay_by, persistent: true) channel_connector.connection_close # if it gets this far it has succeeded info "END Transmitting Delayed Message on #{routing_key} -> #{payload}" true end end end