lib/propono/components/queue_subscription.rb in propono-1.7.0 vs lib/propono/components/queue_subscription.rb in propono-2.0.0.rc1
- old
+ new
@@ -1,38 +1,48 @@
module Propono
class QueueSubscription
- include Sns
- include Sqs
+ attr_reader :aws_client, :propono_config, :topic_arn, :queue_name, :queue, :failed_queue, :corrupt_queue, :slow_queue
- attr_reader :topic_arn, :queue_name, :queue, :failed_queue, :corrupt_queue, :slow_queue
-
- def self.create(topic_id, options = {})
- new(topic_id, options).tap do |subscription|
+ def self.create(*args)
+ new(*args).tap do |subscription|
subscription.create
end
end
- def initialize(topic_id, options = {})
- @topic_id = topic_id
- @suffixed_topic_id = "#{topic_id}#{Propono.config.queue_suffix}"
- @suffixed_slow_topic_id = "#{topic_id}#{Propono.config.queue_suffix}-slow"
- @queue_name = "#{Propono.config.application_name.gsub(" ", "_")}-#{@suffixed_topic_id}"
+ def initialize(aws_client, propono_config, topic_name)
+ @aws_client = aws_client
+ @propono_config = propono_config
+ @topic_name = topic_name
+ @suffixed_topic_name = "#{topic_name}#{propono_config.queue_suffix}"
+ @suffixed_slow_topic_name = "#{topic_name}#{propono_config.queue_suffix}-slow"
+ @queue_name = "#{propono_config.application_name.gsub(" ", "_")}-#{@suffixed_topic_name}"
end
def create
- raise ProponoError.new("topic_id is nil") unless @topic_id
- @topic = TopicCreator.find_or_create(@suffixed_topic_id)
- @queue = QueueCreator.find_or_create(queue_name)
- @failed_queue = QueueCreator.find_or_create("#{queue_name}-failed")
- @corrupt_queue = QueueCreator.find_or_create("#{queue_name}-corrupt")
- sns.subscribe(@topic.arn, @queue.arn, 'sqs')
- sqs.set_queue_attributes(@queue.url, "Policy", generate_policy(@queue, @topic))
+ raise ProponoError.new("topic_name is nil") unless @topic_name
+ create_and_subscribe_main_queue
+ create_and_subscribe_slow_queue
+ create_misc_queues
+ end
- @slow_queue = QueueCreator.find_or_create("#{queue_name}-slow")
- @slow_topic = TopicCreator.find_or_create(@suffixed_slow_topic_id)
- sns.subscribe(@slow_topic.arn, @slow_queue.arn, 'sqs')
- sqs.set_queue_attributes(@slow_queue.url, "Policy", generate_policy(@slow_queue, @slow_topic))
+ def create_and_subscribe_main_queue
+ @queue = aws_client.create_queue(queue_name)
+ topic = aws_client.create_topic(@suffixed_topic_name)
+ aws_client.subscribe_sqs_to_sns(@queue, topic)
+ aws_client.set_sqs_policy(@queue, generate_policy(@queue, topic))
+ end
+
+ def create_misc_queues
+ @failed_queue = aws_client.create_queue("#{queue_name}-failed")
+ @corrupt_queue = aws_client.create_queue("#{queue_name}-corrupt")
+ end
+
+ def create_and_subscribe_slow_queue
+ @slow_queue = aws_client.create_queue("#{queue_name}-slow")
+ slow_topic = aws_client.create_topic(@suffixed_slow_topic_name)
+ aws_client.subscribe_sqs_to_sns(@slow_queue, slow_topic)
+ aws_client.set_sqs_policy(@slow_queue, generate_policy(@slow_queue, slow_topic))
end
private
def generate_policy(queue, topic)