lib/maitredee/adapters/sns_sqs_adapter.rb in maitredee-0.8.2 vs lib/maitredee/adapters/sns_sqs_adapter.rb in maitredee-0.8.3

- old
+ new

@@ -1,22 +1,30 @@ require "aws-sdk-sns" require "aws-sdk-sqs" module Maitredee module Adapters - class SnsSqsAdapter + class SnsSqsAdapter < BaseAdapter attr_reader :access_key_id, :secret_access_key, :region + # @param access_key_id [String] if `nil` will look in `ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]` + # @param secret_access_key [String] if `nil` will look in `ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]` + # @param region [String] if `nil` will look in `ENV["MAITREDEE_AWS_REGION"]` + # @param default_shoryuken_options [Hash] default options of the shoryuken job listening to the queues + # defaults to `{ body_parser: :json, auto_delete: true }` + def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) @access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"] @secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"] @region = region || ENV["MAITREDEE_AWS_REGION"] @default_shoryuken_options = default_shoryuken_options Shoryuken.sqs_client = sqs_client end + # publishes message to SNS + # @param message [PublisherMessage] def publish(message) message_attributes = { message_id: message.message_id, topic_name: message.topic_name, event_name: message.event_name, @@ -30,10 +38,12 @@ message: message.body.to_json, message_attributes: sns_message_attributes(message_attributes) ) end + # creates topics from keys and queues from values, and subscribes queues to topics + # @param config [Hash{String => Array<String>}] def configure_broker(config) config.each do |topic_resource_name, queue_resource_names| queue_resource_names.each do |queue_resource_name| subscribe( topic_resource_name: topic_resource_name, @@ -41,30 +51,36 @@ ) end end end + # @api private def topics @topics ||= Hash.new do |hash, key| topic = sns_client.create_topic( name: key ) hash[key] = Aws::SNS::Topic.new(topic.topic_arn, client: sns_client) end end + # @api private def queues @queues ||= Hash.new do |hash, key| queue_url = sqs_client.create_queue(queue_name: key).queue_url hash[key] = Aws::SQS::Queue.new(queue_url, client: sqs_client) end end + # @api private def subscriptions @subscriptions ||= {} end + # subscribes a queue to a topic + # @param queue_resource_name [String] + # @param topic_resource_name [String] def subscribe(queue_resource_name:, topic_resource_name:) topic = topics[topic_resource_name] queue = queues[queue_resource_name] queue_arn = queue.attributes["QueueArn"] @@ -100,16 +116,19 @@ self.class.const_set worker_name, worker_class end worker_class end + # @api private def default_shoryuken_options @default_shoryuken_options ||= { body_parser: :json, auto_delete: true } end + # deletes all topics, queues, and subscriptions + # @api private def reset [topics, queues, subscriptions].each do |resource| resource.values.each(&:delete) resource.clear end