require "aws-sdk-sns" require "aws-sdk-sqs" module Maitredee module Adapters class SnsSqsAdapter < BaseAdapter attr_reader :access_key_id, :secret_access_key, :region # @param access_key_id [String] if `nil` will look in `ENV["MAITREDEE_AWS_ACCESS_KEY_ID"]` # @param secret_access_key [String] if `nil` will look in `ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"]` # @param region [String] if `nil` will look in `ENV["MAITREDEE_AWS_REGION"]` # @param default_shoryuken_options [Hash] default options of the shoryuken job listening to the queues # defaults to `{ body_parser: :json, auto_delete: true }` def initialize(access_key_id: nil, secret_access_key: nil, region: nil, default_shoryuken_options: nil) @access_key_id = access_key_id || ENV["MAITREDEE_AWS_ACCESS_KEY_ID"] @secret_access_key = secret_access_key || ENV["MAITREDEE_AWS_SECRET_ACCESS_KEY"] @region = region || ENV["MAITREDEE_AWS_REGION"] @default_shoryuken_options = default_shoryuken_options Shoryuken.sqs_client = sqs_client end # publishes message to SNS # @param message [PublisherMessage] def publish(message) message_attributes = { message_id: message.message_id, topic_name: message.topic_name, event_name: message.event_name, primary_key: message.primary_key, schema_name: message.schema_name, maitredee_version: Maitredee::VERSION }.compact sns_client.publish( topic_arn: topics[message.topic_resource_name].arn, message: JSON.dump(message.body), message_attributes: sns_message_attributes(message_attributes) ) end # creates topics from keys and queues from values, and subscribes queues to topics # @param config [Hash{String => Array}] def configure_broker(config) config.each do |topic_resource_name, queue_resource_names| queue_resource_names.each do |queue_resource_name| subscribe( topic_resource_name: topic_resource_name, queue_resource_name: queue_resource_name ) end end end # @api private def topics @topics ||= Hash.new do |hash, key| topic = sns_client.create_topic( name: key ) hash[key] = Aws::SNS::Topic.new(topic.topic_arn, client: sns_client) end end # @api private def queues @queues ||= Hash.new do |hash, key| queue_url = sqs_client.create_queue(queue_name: key).queue_url hash[key] = Aws::SQS::Queue.new(queue_url, client: sqs_client) end end # @api private def subscriptions @subscriptions ||= {} end # subscribes a queue to a topic # @param queue_resource_name [String] # @param topic_resource_name [String] def subscribe(queue_resource_name:, topic_resource_name:) topic = topics[topic_resource_name] queue = queues[queue_resource_name] queue_arn = queue.attributes["QueueArn"] resp = sns_client.subscribe( topic_arn: topic.arn, protocol: "sqs", endpoint: queue_arn, attributes: { "RawMessageDelivery" => "true" } ) subscriptions[resp.subscription_arn] = Aws::SNS::Subscription.new(resp.subscription_arn, client: sns_client) queue.set_attributes( attributes: { "Policy" => sqs_policy( queue_arn: queue_arn, topic_arn: topic.arn ) } ) end def add_worker(subscriber_class) worker_name = "#{subscriber_class.name}Worker" worker_class = self.class.const_defined?(worker_name) unless worker_class worker_class = Class.new(Worker) worker_class.shoryuken_options default_shoryuken_options.merge( queue: subscriber_class.queue_resource_name ) worker_class.subscriber_class = subscriber_class self.class.const_set worker_name, worker_class end worker_class end # @api private def default_shoryuken_options @default_shoryuken_options ||= { body_parser: :json, auto_delete: true, retry_intervals: ->(attempts) { ((attempts - 1) ** 4) + 15 + (rand(30) * (attempts)) } } end # deletes all topics, queues, and subscriptions # @api private def reset [topics, queues, subscriptions].each do |resource| resource.values.each(&:delete) resource.clear end end private def sns_client @sns_client ||= new_client(Aws::SNS::Client) end def sqs_client @sqs_client ||= new_client(Aws::SQS::Client) end def new_client(klass) options = {} if access_key_id && secret_access_key options.merge!( access_key_id: access_key_id, secret_access_key: secret_access_key ) end options[:region] = region if region klass.new(options) end def sns_message_attributes(hash) hash.compact.each_with_object({}) do |(key, val), new_hash| new_hash[key.to_s] = { data_type: "String", string_value: val } end end def sqs_policy(queue_arn:, topic_arn:) <<~POLICY { "Version": "2008-10-17", "Id": "#{queue_arn}/SQSDefaultPolicy", "Statement": [ { "Sid": "#{queue_arn}-Sid", "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SQS:*", "Resource": "#{queue_arn}", "Condition": { "StringEquals": { "aws:SourceArn": "#{topic_arn}" } } } ] } POLICY end # @private class Worker include Shoryuken::Worker class << self attr_accessor :subscriber_class end MESSAGE_KEYS = %w[ event_name maitredee_version primary_key schema_name topic_name ].freeze def perform(sqs_message, body) attributes = MESSAGE_KEYS.each_with_object({}) do |key, hash| hash[key.to_sym] = sqs_message.message_attributes[key]&.string_value end attributes.merge!( adapter_message: sqs_message, body: body, broker_message_id: sqs_message.message_id, message_id: sqs_message.message_id, raw_message: sqs_message.body, sent_at: Time.at(sqs_message.attributes["SentTimestamp"].to_i) ) subscriber_message = SubscriberMessage.new(attributes) self.class.subscriber_class.process(subscriber_message) end end end end end