lib/fake_sqs/queue.rb in fake_sqs-0.0.11 vs lib/fake_sqs/queue.rb in fake_sqs-0.1.0
- old
+ new
@@ -1,90 +1,169 @@
require 'securerandom'
+require 'fake_sqs/collection_view'
module FakeSQS
+ MessageNotInflight = Class.new(RuntimeError)
ReadCountOutOfRange = Class.new(RuntimeError)
+ ReceiptHandleIsInvalid = Class.new(RuntimeError)
class Queue
- attr_reader :name, :messages, :message_factory, :messages_in_flight, :arn, :queue_attributes
+ VISIBILITY_TIMEOUT = 30
+ attr_reader :name, :message_factory, :arn, :queue_attributes
+
def initialize(options = {})
@message_factory = options.fetch(:message_factory)
@name = options.fetch("QueueName")
@arn = options.fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" }
@queue_attributes = options.fetch("Attributes") { {} }
+ @lock = Monitor.new
reset
end
def to_yaml
{
- "QueueName" => name,
- "Arn" => arn,
- "Attributes" => queue_attributes,
+ "QueueName" => name,
+ "Arn" => arn,
+ "Attributes" => queue_attributes,
}
end
def add_queue_attributes(attrs)
queue_attributes.merge!(attrs)
end
def attributes
queue_attributes.merge(
"QueueArn" => arn,
- "ApproximateNumberOfMessages" => messages.size,
- "ApproximateNumberOfMessagesNotVisible" => messages_in_flight.size,
+ "ApproximateNumberOfMessages" => @messages.size,
+ "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size,
)
end
def send_message(options = {})
- message = message_factory.new(options)
- messages << message
- message
+ with_lock do
+ message = message_factory.new(options)
+ @messages << message
+ message
+ end
end
def receive_message(options = {})
amount = Integer options.fetch("MaxNumberOfMessages") { "1" }
fail ReadCountOutOfRange, amount if amount > 10
- return {} if messages.empty?
+ return {} if @messages.empty?
result = {}
- actual_amount = amount > size ? size : amount
+ with_lock do
+ actual_amount = amount > size ? size : amount
- actual_amount.times do
- message = messages.delete_at(rand(size))
- receipt = generate_receipt
- messages_in_flight[receipt] = message
- result[receipt] = message
+ actual_amount.times do
+ message = @messages.delete_at(rand(size))
+ message.expire_at(default_visibility_timeout)
+ receipt = generate_receipt
+ @messages_in_flight[receipt] = message
+ result[receipt] = message
+ end
end
result
end
+ def default_visibility_timeout
+ if value = attributes['VisibilityTimeout']
+ value.to_i
+ else
+ VISIBILITY_TIMEOUT
+ end
+ end
+
+ def timeout_messages!
+ with_lock do
+ expired = @messages_in_flight.inject({}) do |memo,(receipt,message)|
+ if message.expired?
+ memo[receipt] = message
+ end
+ memo
+ end
+ expired.each do |receipt,message|
+ message.expire!
+ @messages << message
+ delete_message(receipt)
+ end
+ end
+ end
+
+ def change_message_visibility(receipt, visibility)
+ with_lock do
+ message = @messages_in_flight[receipt]
+ raise MessageNotInflight unless message
+
+ if visibility == 0
+ message.expire!
+ @messages << message
+ delete_message(receipt)
+ else
+ message.expire_at(visibility)
+ end
+
+ end
+ end
+
def delete_message(receipt)
- messages_in_flight.delete(receipt)
+ with_lock do
+ @messages_in_flight.delete(receipt)
+ end
end
def reset
- @messages = []
- @messages_in_flight = {}
+ with_lock do
+ @messages = []
+ @messages_view = FakeSQS::CollectionView.new(@messages)
+ reset_messages_in_flight
+ end
end
def expire
- @messages += messages_in_flight.values
- @messages_in_flight = {}
+ with_lock do
+ @messages += @messages_in_flight.values
+ reset_messages_in_flight
+ end
end
+ def reset_messages_in_flight
+ with_lock do
+ @messages_in_flight = {}
+ @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight)
+ end
+ end
+
+ def messages
+ @messages_view
+ end
+
+ def messages_in_flight
+ @messages_in_flight_view
+ end
+
def size
messages.size
end
def generate_receipt
SecureRandom.hex
+ end
+
+ def with_lock
+ @lock.synchronize do
+ yield
+ end
end
end
end