lib/z_k/event_handler.rb in zk-0.8.1 vs lib/z_k/event_handler.rb in zk-0.8.2
- old
+ new
@@ -1,9 +1,10 @@
module ZK
- # this is the default watcher provided by the zookeeper connection
+ # This is the default watcher provided by the zookeeper connection
# watchers are implemented by adding the :watch => true flag to
# any #children or #get or #exists calls
+ #
# you never really need to initialize this yourself
class EventHandler
include org.apache.zookeeper.Watcher if defined?(JRUBY_VERSION)
include ZK::Logging
@@ -30,20 +31,54 @@
h.tap { |x| x[k] = Set.new }
end
end
# register a path with the handler
+ #
# your block will be called with all events on that path.
- # aliased as #subscribe
+ #
+ # @note All watchers are one-shot handlers. After an event is delivered to
+ # your handler, you *must* re-watch the node to receive more events. This
+ # leads to a pattern you will find throughout ZK code that avoids races,
+ # see the example below "avoiding a race"
+ #
+ # @example avoiding a race waiting for a node to be deleted
+ #
+ # # we expect that '/path/to/node' exists currently and want to be notified
+ # # when it's deleted
+ #
+ # # register a handler that will be called back when an event occurs on
+ # # node
+ # #
+ # node_subscription = zk.event_handler.register('/path/to/node') do |event|
+ # if event.node_deleted?
+ # do_something_when_node_deleted
+ # end
+ # end
+ #
+ # # check to see if our condition is true *while* setting a watch on the node
+ # # if our condition happens to be true while setting the watch
+ # #
+ # unless exists?('/path/to/node', :watch => true)
+ # node_subscription.unsubscribe # cancel the watch
+ # do_something_when_node_deleted # call the callback
+ # end
+ #
+ #
# @param [String] path the path you want to listen to
+ #
# @param [Block] block the block to execute when a watch event happpens
- # @yield [connection, event] We will call your block with the connection the
- # watch event occured on and the event object
+ #
+ # @yield [event] We will call your block with the watch event object (which
+ # has the connection the event occurred on as its #zk attribute)
+ #
# @return [ZooKeeper::EventHandlerSubscription] the subscription object
# you can use to to unsubscribe from an event
+ #
# @see ZooKeeper::WatcherEvent
- # @see ZooKeeper::EventHandlerSubscription
+ # @see ZK::EventHandlerSubscription
+ #
def register(path, &block)
# logger.debug { "EventHandler#register path=#{path.inspect}" }
EventHandlerSubscription.new(self, path, block).tap do |subscription|
synchronize { @callbacks[path] << subscription }
end
@@ -52,27 +87,28 @@
# registers a "state of the connection" handler
#
# @param [String] state the state you want to register for
# @param [Block] block the block to execute on state changes
- # @yield [connection, event] yields your block with
+ # @yield [event] yields your block with
+ #
def register_state_handler(state, &block)
register(state_key(state), &block)
end
# @deprecated use #unsubscribe on the subscription object
- # @see ZooKeeper::EventHandlerSubscription#unsubscribe
+ # @see ZK::EventHandlerSubscription#unsubscribe
def unregister_state_handler(*args)
if args.first.is_a?(EventHandlerSubscription)
unregister(args.first)
else
unregister(state_key(args.first), args[1])
end
end
# @deprecated use #unsubscribe on the subscription object
- # @see ZooKeeper::EventHandlerSubscription#unsubscribe
+ # @see ZK::EventHandlerSubscription#unsubscribe
def unregister(*args)
if args.first.is_a?(EventHandlerSubscription)
subscription = args.first
elsif args.first.is_a?(String) and args[1].is_a?(EventHandlerSubscription)
subscription = args[1]
@@ -91,11 +127,12 @@
nil
end
alias :unsubscribe :unregister
# called from the client-registered callback when an event fires
- def process(event) #:nodoc:
+ # @private
+ def process(event)
# logger.debug { "EventHandler#process dispatching event: #{event.inspect}" }# unless event.type == -1
event.zk = @zk
cb_key =
if event.node_event?
@@ -125,21 +162,24 @@
safe_call(cb_ary, event)
end
# used during shutdown to clear registered listeners
+ # @private
def clear! #:nodoc:
synchronize do
@callbacks.clear
nil
end
end
- def synchronize #:nodoc:
+ # @private
+ def synchronize
@mutex.synchronize { yield }
end
+ # @private
def get_default_watcher_block
@default_watcher_block ||= lambda do |hash|
watcher_callback.tap do |cb|
cb.call(hash)
end
@@ -149,10 +189,12 @@
# implements not only setting up the watcher callback, but deduplicating
# event delivery. Keeps track of in-flight watcher-type+path requests and
# doesn't re-register the watcher with the server until a response has been
# fired. This prevents one event delivery to *every* callback per :watch => true
# argument.
+ #
+ # @private
def setup_watcher!(watch_type, opts)
return unless opts.delete(:watch)
synchronize do
set = @outstanding_watches.fetch(watch_type)
@@ -167,14 +209,16 @@
end
end
end
protected
+ # @private
def watcher_callback
ZookeeperCallbacks::WatcherCallback.create { |event| process(event) }
end
+ # @private
def state_key(arg)
int =
case arg
when String, Symbol
ZookeeperConstants.const_get(:"ZOO_#{arg.to_s.upcase}_STATE")
@@ -187,9 +231,10 @@
"state_#{int}"
rescue NameError
raise ArgumentError, "#{arg} is not a valid zookeeper state", caller
end
+ # @private
def safe_call(callbacks, *args)
while cb = callbacks.shift
begin
cb.call(*args) if cb.respond_to?(:call)
rescue Exception => e