module Propono
class Client
# Propono configuration settings.
# Settings should be set in an initializer or using some
# other method that insures they are set before any
# Propono code is used. They can be set as followed:
# Propono.config.access_key = "my-access-key"
# The following settings are allowed:
# * :access_key - The AWS access key
# * :secret_key - The AWS secret key
# * :queue_region - The AWS region
# * :application_name - The name of the application Propono
# is included in.
# * :queue_suffix - Optional string to append to topic and queue names.
# * :logger - A logger object that responds to puts.
attr_reader :config, :aws_client
def initialize(settings = {}, &block)
@config =
if block_given?
settings.each do |key, value|
config.send("#{key}=", value)
@aws_client =
def configure(&block)
yield config
# Publishes a new message into the Propono pub/sub network.
# This requires a topic and a message. By default this pushes
# out AWS SNS.
# @param [String] topic The name of the topic to publish to.
# @param [String] message The message to post.
def publish(topic, message, options = {})
suffixed_topic = "#{topic}#{config.queue_suffix}"
Publisher.publish(aws_client, config, suffixed_topic, message, options)
# Creates a new SNS-SQS subscription on the specified topic.
# This is implicitly called by {#listen}.
# @param [String] topic The name of the topic to subscribe to.
def subscribe(topic)
QueueSubscription.create(aws_client, config, topic)
# Listens on a queue and yields for each message
# Calling this will enter a queue-listening loop that
# yields the message_processor for each messages.
# This method will automatically create a subscription if
# one does not exist, so there is no need to call
# subscribe in addition.
# @param [String] topic The topic to subscribe to.
# @param &message_processor The block to yield for each message.
def listen(topic_name, &message_processor)
QueueListener.listen(aws_client, config, topic_name, &message_processor)
# Listens on a queue and yields for each message
# Calling this will enter a queue-listening loop that
# yields the message_processor for each messages. The
# loop will end when all messages have been processed.
# This method will automatically create a subscription if
# one does not exist, so there is no need to call
# subscribe in addition.
# @param [String] topic The topic to subscribe to.
# @param &message_processor The block to yield for each message.
def drain_queue(topic, &message_processor)
QueueListener.drain(aws_client, config, topic, &message_processor)