lib/fake_sqs/queue.rb in fake_sqs-0.3.1 vs lib/fake_sqs/queue.rb in fake_sqs-0.4.0

- old
+ new

@@ -1,8 +1,8 @@ require 'securerandom' require 'fake_sqs/collection_view' - +require 'json' module FakeSQS MessageNotInflight = Class.new(RuntimeError) ReadCountOutOfRange = Class.new(RuntimeError) ReceiptHandleIsInvalid = Class.new(RuntimeError) @@ -36,41 +36,47 @@ end def attributes queue_attributes.merge( "QueueArn" => arn, - "ApproximateNumberOfMessages" => @messages.size, + "ApproximateNumberOfMessages" => published_size, "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size, ) end def send_message(options = {}) with_lock do - message = message_factory.new(options) + message = options.fetch(:message){ message_factory.new(options) } @messages << message message end end def receive_message(options = {}) amount = Integer options.fetch("MaxNumberOfMessages") { "1" } + visibility_timeout = Integer options.fetch("VisibilityTimeout") { default_visibility_timeout } fail ReadCountOutOfRange, amount if amount > 10 return {} if @messages.empty? result = {} with_lock do - actual_amount = amount > size ? size : amount + actual_amount = amount > published_size ? published_size : amount + published_messages = @messages.select { |m| m.published? } actual_amount.times do - message = @messages.delete_at(rand(size)) - message.expire_at(default_visibility_timeout) - receipt = generate_receipt - @messages_in_flight[receipt] = message - result[receipt] = message + message = published_messages.delete_at(rand(published_size)) + @messages.delete(message) + unless check_message_for_dlq(message, options) + message.expire_at(visibility_timeout) + message.receive! + receipt = generate_receipt + @messages_in_flight[receipt] = message + result[receipt] = message + end end end result end @@ -109,11 +115,21 @@ @messages << message delete_message(receipt) else message.expire_at(visibility) end + end + end + def check_message_for_dlq(message, options={}) + if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"]) + dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]} + if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i + dlq.send_message(message: message) + message.expire! + true + end end end def delete_message(receipt) with_lock do @@ -151,9 +167,13 @@ @messages_in_flight_view end def size messages.size + end + + def published_size + messages.select { |m| m.published? }.size end def generate_receipt SecureRandom.hex end