lib/z_k/event_handler.rb in zk-0.8.1 vs lib/z_k/event_handler.rb in zk-0.8.2

- old
+ new

@@ -1,9 +1,10 @@ module ZK - # this is the default watcher provided by the zookeeper connection + # This is the default watcher provided by the zookeeper connection # watchers are implemented by adding the :watch => true flag to # any #children or #get or #exists calls + # # you never really need to initialize this yourself class EventHandler include org.apache.zookeeper.Watcher if defined?(JRUBY_VERSION) include ZK::Logging @@ -30,20 +31,54 @@ h.tap { |x| x[k] = Set.new } end end # register a path with the handler + # # your block will be called with all events on that path. - # aliased as #subscribe + # + # @note All watchers are one-shot handlers. After an event is delivered to + # your handler, you *must* re-watch the node to receive more events. This + # leads to a pattern you will find throughout ZK code that avoids races, + # see the example below "avoiding a race" + # + # @example avoiding a race waiting for a node to be deleted + # + # # we expect that '/path/to/node' exists currently and want to be notified + # # when it's deleted + # + # # register a handler that will be called back when an event occurs on + # # node + # # + # node_subscription = zk.event_handler.register('/path/to/node') do |event| + # if event.node_deleted? + # do_something_when_node_deleted + # end + # end + # + # # check to see if our condition is true *while* setting a watch on the node + # # if our condition happens to be true while setting the watch + # # + # unless exists?('/path/to/node', :watch => true) + # node_subscription.unsubscribe # cancel the watch + # do_something_when_node_deleted # call the callback + # end + # + # # @param [String] path the path you want to listen to + # # @param [Block] block the block to execute when a watch event happpens - # @yield [connection, event] We will call your block with the connection the - # watch event occured on and the event object + # + # @yield [event] We will call your block with the watch event object (which + # has the connection the event occurred on as its #zk attribute) + # # @return [ZooKeeper::EventHandlerSubscription] the subscription object # you can use to to unsubscribe from an event + # # @see ZooKeeper::WatcherEvent - # @see ZooKeeper::EventHandlerSubscription + # @see ZK::EventHandlerSubscription + # def register(path, &block) # logger.debug { "EventHandler#register path=#{path.inspect}" } EventHandlerSubscription.new(self, path, block).tap do |subscription| synchronize { @callbacks[path] << subscription } end @@ -52,27 +87,28 @@ # registers a "state of the connection" handler # # @param [String] state the state you want to register for # @param [Block] block the block to execute on state changes - # @yield [connection, event] yields your block with + # @yield [event] yields your block with + # def register_state_handler(state, &block) register(state_key(state), &block) end # @deprecated use #unsubscribe on the subscription object - # @see ZooKeeper::EventHandlerSubscription#unsubscribe + # @see ZK::EventHandlerSubscription#unsubscribe def unregister_state_handler(*args) if args.first.is_a?(EventHandlerSubscription) unregister(args.first) else unregister(state_key(args.first), args[1]) end end # @deprecated use #unsubscribe on the subscription object - # @see ZooKeeper::EventHandlerSubscription#unsubscribe + # @see ZK::EventHandlerSubscription#unsubscribe def unregister(*args) if args.first.is_a?(EventHandlerSubscription) subscription = args.first elsif args.first.is_a?(String) and args[1].is_a?(EventHandlerSubscription) subscription = args[1] @@ -91,11 +127,12 @@ nil end alias :unsubscribe :unregister # called from the client-registered callback when an event fires - def process(event) #:nodoc: + # @private + def process(event) # logger.debug { "EventHandler#process dispatching event: #{event.inspect}" }# unless event.type == -1 event.zk = @zk cb_key = if event.node_event? @@ -125,21 +162,24 @@ safe_call(cb_ary, event) end # used during shutdown to clear registered listeners + # @private def clear! #:nodoc: synchronize do @callbacks.clear nil end end - def synchronize #:nodoc: + # @private + def synchronize @mutex.synchronize { yield } end + # @private def get_default_watcher_block @default_watcher_block ||= lambda do |hash| watcher_callback.tap do |cb| cb.call(hash) end @@ -149,10 +189,12 @@ # implements not only setting up the watcher callback, but deduplicating # event delivery. Keeps track of in-flight watcher-type+path requests and # doesn't re-register the watcher with the server until a response has been # fired. This prevents one event delivery to *every* callback per :watch => true # argument. + # + # @private def setup_watcher!(watch_type, opts) return unless opts.delete(:watch) synchronize do set = @outstanding_watches.fetch(watch_type) @@ -167,14 +209,16 @@ end end end protected + # @private def watcher_callback ZookeeperCallbacks::WatcherCallback.create { |event| process(event) } end + # @private def state_key(arg) int = case arg when String, Symbol ZookeeperConstants.const_get(:"ZOO_#{arg.to_s.upcase}_STATE") @@ -187,9 +231,10 @@ "state_#{int}" rescue NameError raise ArgumentError, "#{arg} is not a valid zookeeper state", caller end + # @private def safe_call(callbacks, *args) while cb = callbacks.shift begin cb.call(*args) if cb.respond_to?(:call) rescue Exception => e