lib/fake_sqs/queue.rb in fake_sqs-0.4.1 vs lib/fake_sqs/queue.rb in fake_sqs-0.4.2

- old
+ new

@@ -1,9 +1,10 @@ require 'monitor' require 'securerandom' require 'fake_sqs/collection_view' require 'json' + module FakeSQS MessageNotInflight = Class.new(RuntimeError) ReadCountOutOfRange = Class.new(RuntimeError) ReceiptHandleIsInvalid = Class.new(RuntimeError) @@ -45,11 +46,13 @@ end def send_message(options = {}) with_lock do message = options.fetch(:message){ message_factory.new(options) } - @messages << message + if message + @messages[message.receipt] = message + end message end end def receive_message(options = {}) @@ -62,21 +65,20 @@ result = {} with_lock do actual_amount = amount > published_size ? published_size : amount - published_messages = @messages.select { |m| m.published? } + published_messages = @messages.values.select { |m| m.published? } actual_amount.times do message = published_messages.delete_at(rand(published_size)) - @messages.delete(message) + @messages.delete(message.receipt) unless check_message_for_dlq(message, options) message.expire_at(visibility_timeout) message.receive! - receipt = generate_receipt - @messages_in_flight[receipt] = message - result[receipt] = message + @messages_in_flight[message.receipt] = message + result[message.receipt] = message end end end result @@ -98,12 +100,12 @@ end memo end expired.each do |receipt,message| message.expire! - @messages << message - delete_message(receipt) + @messages[receipt] = message + @messages_in_flight.delete(receipt) end end end def change_message_visibility(receipt, visibility) @@ -111,12 +113,12 @@ message = @messages_in_flight[receipt] raise MessageNotInflight unless message if visibility == 0 message.expire! - @messages << message - delete_message(receipt) + @messages[receipt] = message + @messages_in_flight.delete(receipt) else message.expire_at(visibility) end end end @@ -132,25 +134,27 @@ end end def delete_message(receipt) with_lock do + @messages.delete(receipt) @messages_in_flight.delete(receipt) end end def reset with_lock do - @messages = [] + @messages = {} @messages_view = FakeSQS::CollectionView.new(@messages) reset_messages_in_flight end end def expire with_lock do - @messages += @messages_in_flight.values + @messages.merge!(@messages_in_flight) + @messages_in_flight.clear() reset_messages_in_flight end end def reset_messages_in_flight @@ -167,18 +171,14 @@ def messages_in_flight @messages_in_flight_view end def size - messages.size + @messages.size end def published_size - messages.select { |m| m.published? }.size - end - - def generate_receipt - SecureRandom.hex + @messages.values.select { |m| m.published? }.size end def with_lock @lock.synchronize do yield