lib/sqs/queue.rb in qoobaa-sqs-0.1.0 vs lib/sqs/queue.rb in qoobaa-sqs-0.1.1
- old
+ new
@@ -1,104 +1,95 @@
module Sqs
class Queue
extend Forwardable
+ include Parser
attr_reader :path, :name, :service
-
def_instance_delegators :service, :service_request
+ private_class_method :new
def ==(other)
self.name == other.name and self.service == other.service
end
def destroy
- queue_request({ :params => { "Action" => "DeleteQueue" } })
+ delete_queue
true
end
def attributes
- response = queue_request({ :params => { "Action" => "GetQueueAttributes", "AttributeName" => "All" }})
- parse_get_queue_attributes_result(response.body)
+ get_queue_attributes
end
def update_attributes(attributes)
- set_attributes = {}
- attributes.each_with_index do |attribute, i|
- set_attributes["Attribute.#{i + 1}.Name"] = attribute.first.to_s
- set_attributes["Attribute.#{i + 1}.Value"] = attribute.last.to_s
- end
- response = queue_request(:params => set_attributes.merge("Action" => "SetQueueAttributes"))
+ set_queue_attributes(attributes)
+ true
end
def create_message(body)
- response = queue_request(:params => { "Action" => "SendMessage", "MessageBody" => body })
+ send_message("MessageBody" => body)
+ true
end
def message(visibility_timeout = nil)
- params = {}
- params["VisibilityTimeout"] = visibility_timeout.to_s if visibility_timeout
- response = queue_request(:params => params.merge("Action" => "ReceiveMessage"))
- parse_receive_message_result(response.body).first
+ options = {}
+ options["VisibilityTimeout"] = visibility_timeout.to_s if visibility_timeout
+ receive_message(options).first
end
def inspect #:nodoc:
"#<#{self.class}:#{name}>"
end
+ private
+
def initialize(service, url) #:nodoc:
self.service = service
self.url = url
end
- private
-
attr_writer :service, :path
def url=(url)
parsed_url = URI.parse(url)
self.path = parsed_url.path[1..-1]
self.name = parsed_url.path.split("/").last
end
def name=(name)
- raise ArgumentError.new("Invalid queue name: #{name}") unless name_valid?(name)
@name = name
end
def queue_request(options = {})
service_request(options.merge(:path => path))
end
- def name_valid?(name)
- name =~ /[a-zA-Z0-9_-]{1,80}/
+ def delete_queue
+ queue_request("Action" => "DeleteQueue")
end
- def parse_get_queue_attributes_result(xml_body)
- xml = XmlSimple.xml_in(xml_body)
- get_queue_attributes_result = xml["GetQueueAttributesResult"].first
- attributes = get_queue_attributes_result["Attribute"]
- attributes.inject({}) do |result, attribute|
- attribute_name = attribute["Name"].first
- attribute_value = attribute["Value"].first
- result[attribute_name] = attribute_value
- result
+ def get_queue_attributes
+ response = queue_request("Action" => "GetQueueAttributes", "AttributeName" => "All")
+ parse_get_queue_attributes_result(response.body)
+ end
+
+ def set_queue_attributes(options)
+ attributes = {}
+ options.each_with_index do |attribute, i|
+ attributes["Attribute.#{i + 1}.Name"] = attribute.first.to_s
+ attributes["Attribute.#{i + 1}.Value"] = attribute.last.to_s
end
+ queue_request(attributes.merge("Action" => "SetQueueAttributes"))
end
- def parse_receive_message_result(xml_body)
- xml = XmlSimple.xml_in(xml_body)
- receive_message_result = xml["ReceiveMessageResult"].first
- messages = receive_message_result["Message"]
- if messages
- messages.map do |message|
- message_id = message["MessageId"].first
- receipt_handle = message["ReceiptHandle"].first
- md5_of_body = message["MD5OfBody"].first
- body = message["Body"].first
- Message.new(self, :id => message_id, :receipt_handle => receipt_handle, :body_md5 => md5_of_body, :body => body)
- end
- else
- []
+ def send_message(options)
+ queue_request(options.merge("Action" => "SendMessage"))
+ end
+
+ def receive_message(options)
+ response = queue_request(options.merge("Action" => "ReceiveMessage"))
+ parse_receive_message_result(response.body).map do |message_attributes|
+ Message.send(:new, self, message_attributes)
end
end
end
end