lib/fake_sqs/queue.rb in fake_sqs-0.3.1 vs lib/fake_sqs/queue.rb in fake_sqs-0.4.0
- old
+ new
@@ -1,8 +1,8 @@
require 'securerandom'
require 'fake_sqs/collection_view'
-
+require 'json'
module FakeSQS
MessageNotInflight = Class.new(RuntimeError)
ReadCountOutOfRange = Class.new(RuntimeError)
ReceiptHandleIsInvalid = Class.new(RuntimeError)
@@ -36,41 +36,47 @@
end
def attributes
queue_attributes.merge(
"QueueArn" => arn,
- "ApproximateNumberOfMessages" => @messages.size,
+ "ApproximateNumberOfMessages" => published_size,
"ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size,
)
end
def send_message(options = {})
with_lock do
- message = message_factory.new(options)
+ message = options.fetch(:message){ message_factory.new(options) }
@messages << message
message
end
end
def receive_message(options = {})
amount = Integer options.fetch("MaxNumberOfMessages") { "1" }
+ visibility_timeout = Integer options.fetch("VisibilityTimeout") { default_visibility_timeout }
fail ReadCountOutOfRange, amount if amount > 10
return {} if @messages.empty?
result = {}
with_lock do
- actual_amount = amount > size ? size : amount
+ actual_amount = amount > published_size ? published_size : amount
+ published_messages = @messages.select { |m| m.published? }
actual_amount.times do
- message = @messages.delete_at(rand(size))
- message.expire_at(default_visibility_timeout)
- receipt = generate_receipt
- @messages_in_flight[receipt] = message
- result[receipt] = message
+ message = published_messages.delete_at(rand(published_size))
+ @messages.delete(message)
+ unless check_message_for_dlq(message, options)
+ message.expire_at(visibility_timeout)
+ message.receive!
+ receipt = generate_receipt
+ @messages_in_flight[receipt] = message
+ result[receipt] = message
+ end
end
end
result
end
@@ -109,11 +115,21 @@
@messages << message
delete_message(receipt)
else
message.expire_at(visibility)
end
+ end
+ end
+ def check_message_for_dlq(message, options={})
+ if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"])
+ dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]}
+ if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i
+ dlq.send_message(message: message)
+ message.expire!
+ true
+ end
end
end
def delete_message(receipt)
with_lock do
@@ -151,9 +167,13 @@
@messages_in_flight_view
end
def size
messages.size
+ end
+
+ def published_size
+ messages.select { |m| m.published? }.size
end
def generate_receipt
SecureRandom.hex
end