lib/propono/components/queue_subscription.rb in propono-1.7.0 vs lib/propono/components/queue_subscription.rb in propono-2.0.0.rc1

- old
+ new

@@ -1,38 +1,48 @@ module Propono class QueueSubscription - include Sns - include Sqs + attr_reader :aws_client, :propono_config, :topic_arn, :queue_name, :queue, :failed_queue, :corrupt_queue, :slow_queue - attr_reader :topic_arn, :queue_name, :queue, :failed_queue, :corrupt_queue, :slow_queue - - def self.create(topic_id, options = {}) - new(topic_id, options).tap do |subscription| + def self.create(*args) + new(*args).tap do |subscription| subscription.create end end - def initialize(topic_id, options = {}) - @topic_id = topic_id - @suffixed_topic_id = "#{topic_id}#{Propono.config.queue_suffix}" - @suffixed_slow_topic_id = "#{topic_id}#{Propono.config.queue_suffix}-slow" - @queue_name = "#{Propono.config.application_name.gsub(" ", "_")}-#{@suffixed_topic_id}" + def initialize(aws_client, propono_config, topic_name) + @aws_client = aws_client + @propono_config = propono_config + @topic_name = topic_name + @suffixed_topic_name = "#{topic_name}#{propono_config.queue_suffix}" + @suffixed_slow_topic_name = "#{topic_name}#{propono_config.queue_suffix}-slow" + @queue_name = "#{propono_config.application_name.gsub(" ", "_")}-#{@suffixed_topic_name}" end def create - raise ProponoError.new("topic_id is nil") unless @topic_id - @topic = TopicCreator.find_or_create(@suffixed_topic_id) - @queue = QueueCreator.find_or_create(queue_name) - @failed_queue = QueueCreator.find_or_create("#{queue_name}-failed") - @corrupt_queue = QueueCreator.find_or_create("#{queue_name}-corrupt") - sns.subscribe(@topic.arn, @queue.arn, 'sqs') - sqs.set_queue_attributes(@queue.url, "Policy", generate_policy(@queue, @topic)) + raise ProponoError.new("topic_name is nil") unless @topic_name + create_and_subscribe_main_queue + create_and_subscribe_slow_queue + create_misc_queues + end - @slow_queue = QueueCreator.find_or_create("#{queue_name}-slow") - @slow_topic = TopicCreator.find_or_create(@suffixed_slow_topic_id) - sns.subscribe(@slow_topic.arn, @slow_queue.arn, 'sqs') - sqs.set_queue_attributes(@slow_queue.url, "Policy", generate_policy(@slow_queue, @slow_topic)) + def create_and_subscribe_main_queue + @queue = aws_client.create_queue(queue_name) + topic = aws_client.create_topic(@suffixed_topic_name) + aws_client.subscribe_sqs_to_sns(@queue, topic) + aws_client.set_sqs_policy(@queue, generate_policy(@queue, topic)) + end + + def create_misc_queues + @failed_queue = aws_client.create_queue("#{queue_name}-failed") + @corrupt_queue = aws_client.create_queue("#{queue_name}-corrupt") + end + + def create_and_subscribe_slow_queue + @slow_queue = aws_client.create_queue("#{queue_name}-slow") + slow_topic = aws_client.create_topic(@suffixed_slow_topic_name) + aws_client.subscribe_sqs_to_sns(@slow_queue, slow_topic) + aws_client.set_sqs_policy(@slow_queue, generate_policy(@slow_queue, slow_topic)) end private def generate_policy(queue, topic)