lib/zk/message_queue.rb in zk-1.1.0 vs lib/zk/message_queue.rb in zk-1.1.1
- old
+ new
@@ -1,11 +1,15 @@
module ZK
# implements a simple message queue based on Zookeeper recipes
+ #
# @see http://hadoop.apache.org/zookeeper/docs/r3.0.0/recipes.html#sc_recipes_Queues
+ #
# these are good for low-volume queues only
+ #
# because of the way zookeeper works, all message *titles* have to be read into memory
# in order to see what message to process next
+ #
# @example
# queue = zk.queue("somequeue")
# queue.publish(some_string)
# queue.poll! # will return one message
# #subscribe will handle messages as they come in
@@ -27,13 +31,16 @@
@zk.create(full_queue_path, "", :mode => :persistent) unless @zk.exists?(full_queue_path)
end
# publish a message to the queue, you can (optionally) use message titles
# to guarantee unique messages in the queue
- # @param [String] data - any arbitrary string value
- # @param optional [String] message_title - specify a unique message title for this
- # message
+ #
+ # @param [String] data any arbitrary string value
+ #
+ # @param [String] message_title specify a unique message title for this
+ # message (optional)
+ #
def publish(data, message_title = nil)
mode = :persistent_sequential
if message_title
mode = :persistent
else
@@ -42,13 +49,13 @@
@zk.create("#{full_queue_path}/#{message_title}", data, :mode => mode)
rescue KeeperException::NodeExists
return false
end
- # you barely ever need to actually use this method
- # but lets you remove a message from the queue by specifying
- # its title
+ # you barely ever need to actually use this method but lets you remove a
+ # message from the queue by specifying its title
+ #
# @param [String] message_title the title of the message to remove
def delete_message(message_title)
full_path = "#{full_queue_path}/#{message_title}"
locker = @zk.locker("#{full_queue_path}/#{message_title}")
if locker.lock!
@@ -62,19 +69,22 @@
return false
end
end
# grab one message from the queue
+ #
# used when you don't want to or can't subscribe
+ #
# @see ZooKeeper::MessageQueue#subscribe
def poll!
find_and_process_next_available(messages)
end
# @example
# # subscribe like this:
# subscribe {|title, data| handle_message!; true}
# # returning true in the block deletes the message, false unlocks and requeues
+ #
# @yield [title, data] yield to your block with the message title and the data of
# the message
def subscribe(&block)
@subscription_block = block
@sub = @zk.register(full_queue_path) do |event, zk|