lib/maitredee/adapters/sns_sqs_adapter.rb in maitredee-0.8.2 vs lib/maitredee/adapters/sns_sqs_adapter.rb in maitredee-0.8.3
- old
+ new
@@ -1,22 +1,30 @@
require "aws-sdk-sns"
require "aws-sdk-sqs"
module Maitredee
module Adapters
- class SnsSqsAdapter
+ class SnsSqsAdapter < BaseAdapter
attr_reader :access_key_id, :secret_access_key, :region
+ # @param access_key_id [String] if `nil` will look in `ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]`
+ # @param secret_access_key [String] if `nil` will look in `ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]`
+ # @param region [String] if `nil` will look in `ENV["MAITREDEE_AWS_REGION"]`
+ # @param default_shoryuken_options [Hash] default options of the shoryuken job listening to the queues
+ # defaults to `{ body_parser: :json, auto_delete: true }`
+
def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil)
@access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]
@secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]
@region = region || ENV["MAITREDEE_AWS_REGION"]
@default_shoryuken_options = default_shoryuken_options
Shoryuken.sqs_client = sqs_client
end
+ # publishes message to SNS
+ # @param message [PublisherMessage]
def publish(message)
message_attributes = {
message_id: message.message_id,
topic_name: message.topic_name,
event_name: message.event_name,
@@ -30,10 +38,12 @@
message: message.body.to_json,
message_attributes: sns_message_attributes(message_attributes)
)
end
+ # creates topics from keys and queues from values, and subscribes queues to topics
+ # @param config [Hash{String => Array<String>}]
def configure_broker(config)
config.each do |topic_resource_name, queue_resource_names|
queue_resource_names.each do |queue_resource_name|
subscribe(
topic_resource_name: topic_resource_name,
@@ -41,30 +51,36 @@
)
end
end
end
+ # @api private
def topics
@topics ||= Hash.new do |hash, key|
topic = sns_client.create_topic(
name: key
)
hash[key] = Aws::SNS::Topic.new(topic.topic_arn, client: sns_client)
end
end
+ # @api private
def queues
@queues ||= Hash.new do |hash, key|
queue_url = sqs_client.create_queue(queue_name: key).queue_url
hash[key] = Aws::SQS::Queue.new(queue_url, client: sqs_client)
end
end
+ # @api private
def subscriptions
@subscriptions ||= {}
end
+ # subscribes a queue to a topic
+ # @param queue_resource_name [String]
+ # @param topic_resource_name [String]
def subscribe(queue_resource_name:, topic_resource_name:)
topic = topics[topic_resource_name]
queue = queues[queue_resource_name]
queue_arn = queue.attributes["QueueArn"]
@@ -100,16 +116,19 @@
self.class.const_set worker_name, worker_class
end
worker_class
end
+ # @api private
def default_shoryuken_options
@default_shoryuken_options ||= {
body_parser: :json,
auto_delete: true
}
end
+ # deletes all topics, queues, and subscriptions
+ # @api private
def reset
[topics, queues, subscriptions].each do |resource|
resource.values.each(&:delete)
resource.clear
end