lib/propono/components/queue_subscription.rb in propono-1.0.0 vs lib/propono/components/queue_subscription.rb in propono-1.1.0

- old
+ new

@@ -2,21 +2,22 @@ class QueueSubscription include Sns include Sqs - attr_reader :topic_arn, :queue_name, :queue, :failed_queue, :corrupt_queue + attr_reader :topic_arn, :queue_name, :queue, :failed_queue, :corrupt_queue, :slow_queue def self.create(topic_id, options = {}) new(topic_id, options).tap do |subscription| subscription.create end end def initialize(topic_id, options = {}) @topic_id = topic_id @suffixed_topic_id = "#{topic_id}#{Propono.config.queue_suffix}" + @suffixed_slow_topic_id = "#{topic_id}#{Propono.config.queue_suffix}-slow" @queue_name = "#{Propono.config.application_name.gsub(" ", "_")}-#{@suffixed_topic_id}" end def create raise ProponoError.new("topic_id is nil") unless @topic_id @@ -24,10 +25,15 @@ @queue = QueueCreator.find_or_create(queue_name) @failed_queue = QueueCreator.find_or_create("#{queue_name}-failed") @corrupt_queue = QueueCreator.find_or_create("#{queue_name}-corrupt") sns.subscribe(@topic.arn, @queue.arn, 'sqs') sqs.set_queue_attributes(@queue.url, "Policy", generate_policy) + + @slow_queue = QueueCreator.find_or_create("#{queue_name}-slow") + @slow_topic = TopicCreator.find_or_create(@suffixed_slow_topic_id) + sns.subscribe(@slow_topic.arn, @slow_queue.arn, 'sqs') + sqs.set_queue_attributes(@slow_queue.url, "Policy", generate_slow_policy) end private def generate_policy @@ -42,9 +48,29 @@ "Principal": { "AWS": "*" }, "Action": "SQS:*", "Resource": "#{@queue.arn}" + } + ] +} + EOS + end + + def generate_slow_policy + <<-EOS +{ + "Version": "2008-10-17", + "Id": "#{@slow_queue.arn}/SQSDefaultPolicy", + "Statement": [ + { + "Sid": "#{@slow_queue.arn}-Sid", + "Effect": "Allow", + "Principal": { + "AWS": "*" + }, + "Action": "SQS:*", + "Resource": "#{@slow_queue.arn}" } ] } EOS end