lib/bunny/queue.rb in celldee-bunny-0.1.0 vs lib/bunny/queue.rb in celldee-bunny-0.1.1

- old
+ new

@@ -22,19 +22,32 @@ Protocol::Queue::Declare.new({ :queue => name, :nowait => false }.merge(opts)) ) raise ProtocolError, "Error declaring queue #{name}" unless client.next_method.is_a?(Protocol::Queue::DeclareOk) end + + def ack + client.send_frame( + Protocol::Basic::Ack.new(:delivery_tag => delivery_tag) + ) + # reset delivery tag + self.delivery_tag = nil + end + def pop(opts = {}) + # do we want the message header? hdr = opts.delete(:header) + # do we want to have to provide an acknowledgement? + ack = opts.delete(:ack) + client.send_frame( Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, - :no_ack => !opts.delete(:ack), + :no_ack => !ack, :nowait => true }.merge(opts)) ) method = client.next_method @@ -42,10 +55,13 @@ return QUEUE_EMPTY elsif !method.is_a?(Protocol::Basic::GetOk) raise ProtocolError, "Error getting message from queue #{name}" end + # get delivery tag to use for acknowledge + self.delivery_tag = method.delivery_tag if ack + header = client.next_payload msg = client.next_payload raise MessageError, 'unexpected length' if msg.length < header.size hdr ? {:header => header, :payload => msg} : msg @@ -55,25 +71,63 @@ def publish(data, opts = {}) exchange.publish(data, opts) end def message_count - status.first + s = status + s[:message_count] end def consumer_count - status.last + s = status + s[:consumer_count] end - def status(opts = {}, &blk) + def status(opts = {}) client.send_frame( Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts)) ) method = client.next_method - [method.message_count, method.consumer_count] + {:message_count => method.message_count, :consumer_count => method.consumer_count} end + + def subscribe(opts = {}) + consumer_tag = opts[:consumer_tag] || name + + # ignore the :nowait option if passed, otherwise program will not wait for a + # message to get to the server causing an error + opts.delete(:nowait) + + # do we want the message header? + hdr = opts.delete(:header) + + # do we want to have to provide an acknowledgement? + ack = opts.delete(:ack) + + client.send_frame( + Protocol::Basic::Consume.new({ :queue => name, + :consumer_tag => consumer_tag, + :no_ack => !ack, + :nowait => false }.merge(opts)) + ) + + raise ProtocolError, + "Error subscribing to queue #{name}" unless + client.next_method.is_a?(Protocol::Basic::ConsumeOk) + + method = client.next_method + + # get delivery tag to use for acknowledge + self.delivery_tag = method.delivery_tag if ack + + header = client.next_payload + msg = client.next_payload + raise MessageError, 'unexpected length' if msg.length < header.size + hdr ? {:header => header, :payload => msg} : msg + end + def bind(exchange, opts = {}) exchange = exchange.respond_to?(:name) ? exchange.name : exchange # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server @@ -88,10 +142,13 @@ ) raise ProtocolError, "Error binding queue #{name}" unless client.next_method.is_a?(Protocol::Queue::BindOk) + + # return message + BIND_SUCCEEDED end def unbind(exchange, opts = {}) exchange = exchange.respond_to?(:name) ? exchange.name : exchange bindings.delete(exchange) @@ -105,10 +162,13 @@ ) raise ProtocolError, "Error unbinding queue #{name}" unless client.next_method.is_a?(Protocol::Queue::UnbindOk) + + # return message + UNBIND_SUCCEEDED end def delete(opts = {}) # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server @@ -121,9 +181,12 @@ raise ProtocolError, "Error deleting queue #{name}" unless client.next_method.is_a?(Protocol::Queue::DeleteOk) client.queues.delete(name) + + # return confirmation + QUEUE_DELETED end private def exchange @exchange ||= Bunny::Exchange.new(client, '', {:type => :direct, :key => name}) \ No newline at end of file