module ZK module Client # This class forms the base API for interacting with ZooKeeper. Most people will # want to create instances of the class ZK::Client::Threaded, and the most # convenient way of doing that is through the top-level method `ZK.new` # # @note There is a lot of functionality mixed into the subclasses of this # class! You should take a look at {Unixisms}, {Conveniences}, and # {StateMixin} for a lot of the higher-level functionality! # # @example Create a new default connection # # # if no host:port is given, we connect to localhost:2181 by default # # (convenient for use in tests and in irb/pry) # # zk = ZK.new # # @example Create a new connection, specifying host # # zk = ZK.new('localhost:2181') # # @example For quick tasks, you can use the visitor pattern, (like the File class) # # ZK.open('localhost:2181') do |zk| # # do stuff with connection # end # # # connection is automatically closed # class Base # The Eventhandler is used by client code to register callbacks to handle # events triggerd for given paths. # # @see ZK::Client::Base#register attr_reader :event_handler # the wrapped connection object # @private attr_reader :cnx protected :cnx # @deprecated for backwards compatibility only # use ZK::Client::Base#event_handler instead def watcher event_handler end # returns true if the connection has been closed def closed? # XXX: should this be *our* idea of closed or ZOO_CLOSED_STATE ? defined?(::JRUBY_VERSION) ? jruby_closed? : mri_closed? end # @private def inspect "#<#{self.class.name}:#{object_id} zk_session_id=#{safe_session_id} ...>" end # Create a new client and connect to the zookeeper server. # # @param [String] host should be a string of comma-separated host:port # pairs. You can also supply an optional "chroot" suffix that will act as # an implicit prefix to all paths supplied. # # @see ZK::Client::Threaded#initialize valid options to use with the # synchronous (non-evented) client # # @example Threaded client with two hosts and a chroot path # # ZK::Client.new("zk01:2181,zk02:2181/chroot/path") # # @abstract Overridden in subclasses def initialize(host, opts={}) # no-op end private # @private def jruby_closed? cnx.state == Java::OrgApacheZookeeper::ZooKeeper::States::CLOSED end # @private def mri_closed? cnx.closed? end public # reopen the underlying connection # # The `timeout` param is here mainly for legacy support. # # @param [Numeric] timeout how long should we wait for # the connection to reach a connected state before returning. Note that # the method will not raise and will return whether the connection # reaches the 'connected' state or not. The default is actually to use # the same value that was passed to the constructor for 'timeout' # # @return [Symbol] state of connection after operation def reopen(timeout=nil) timeout ||= @session_timeout # XXX: @session_timeout ? cnx.reopen(timeout) @threadpool.start! # restart the threadpool if previously stopped by close! state end # close the underlying connection and clear all pending events. # def close! event_handler.clear! wrap_state_closed_error { cnx.close unless cnx.closed? } end # Create a node with the given path. The node data will be the given data. # The path is returned. # # If the ephemeral option is given, the znode creaed will be removed by the # server automatically when the session associated with the creation of the # node expires. Note that ephemeral nodes cannot have children. # # The sequence option, if true, will cause the server to create a sequential # node. The actual path name of a sequential node will be the given path # plus a suffix "_i" where i is the current sequential number of the node. # Once such a node is created, the sequential number for the path will be # incremented by one (i.e. the generated path will be unique across all # clients). # # Note that since a different actual path is used for each invocation of # creating sequential node with the same path argument, the call will never # throw a NodeExists exception. # # If a node is created successfully, the ZooKeeper server will trigger the # watches on the path left by exists calls, and the watches on the parent # of the node by children calls. # # @overload create(path, opts={}) # creates a znode at the absolute `path` with blank data and given # options # # @overload create(path, data, opts={}) # creates a znode at the absolute `path` with given data and options # # @option opts [Integer] :acl defaults to ZookeeperACLs::ZOO_OPEN_ACL_UNSAFE, # otherwise the ACL for the node. Should be a `ZOO_*` constant defined under the # ZookeeperACLs module in the zookeeper gem. # # @option opts [bool] :ephemeral (false) if true, the created node will be ephemeral # # @option opts [bool] :sequence (false) if true, the created node will be sequential # # @option opts [bool] :sequential (false) alias for :sequence option. if both are given # an ArgumentError is raised # # @option opts [ZookeeperCallbacks::StringCallback] :callback (nil) provide a callback object # that will be called when the znode has been created # # @option opts [Object] :context (nil) an object passed to the `:callback` # given as the `context` param # # @option opts [:ephemeral_sequential, :persistent_sequential, :persistent, :ephemeral] :mode (nil) # may be specified instead of :ephemeral and :sequence options. If `:mode` *and* either of # the `:ephermeral` or `:sequential` options are given, the `:mode` option will win # # @raise [ZK::Exceptions::NodeExists] if a node with the same `path` already exists # # @raise [ZK::Exceptions::NoNode] if the parent node does not exist # # @raise [ZK::Exceptions::NoChildrenForEphemerals] if the parent node of # the given path is ephemeral # # @return [String] the path created on the server # # @todo Document the asynchronous methods # # @example create node, no data, persistent # # zk.create("/path") # # => "/path" # # @example create node, ACL will default to ACL::OPEN_ACL_UNSAFE # # zk.create("/path", "foo") # # => "/path" # # @example create ephemeral node # # zk.create("/path", '', :mode => :ephemeral) # # => "/path" # # @example create sequential node # # zk.create("/path", '', :sequential => true) # # => "/path0" # # # or you can also do: # # zk.create("/path", '', :mode => :persistent_sequential) # # => "/path0" # # # @example create ephemeral and sequential node # # zk.create("/path", '', :sequence => true, :ephemeral => true) # # => "/path0" # # # or you can also do: # # zk.create("/path", "foo", :mode => :ephemeral_sequential) # # => "/path0" # # @example create a child path # # zk.create("/path/child", "bar") # # => "/path/child" # # @example create a sequential child path # # zk.create("/path/child", "bar", :sequence => true, :ephemeral => true) # # => "/path/child0" # # # or you can also do: # # zk.create("/path/child", "bar", :mode => :ephemeral_sequential) # # => "/path/child0" # # @hidden_example create asynchronously with callback object # # class StringCallback # def process_result(return_code, path, context, name) # # do processing here # end # end # # callback = StringCallback.new # context = Object.new # # zk.create("/path", "foo", :callback => callback, :context => context) # # @hidden_example create asynchronously with callback proc # # callback = proc do |return_code, path, context, name| # # do processing here # end # # context = Object.new # # zk.create("/path", "foo", :callback => callback, :context => context) # def create(path, *args) opts = args.extract_options! # be somewhat strict about how many arguments we accept. if args.length > 1 raise ArgumentError, "create takes path, an optional data argument, and options, you passed: (#{path}, *#{args})" end # argh, terrible documentation bug, allow for :sequential, analagous to :sequence if opts.has_key?(:sequential) if opts.has_key?(:sequence) raise ArgumentError, "Only one of :sequential or :sequence options can be given, opts: #{opts}" end opts[:sequence] = opts.delete(:sequential) end data = args.first || '' h = { :path => path, :data => data, :ephemeral => false, :sequence => false }.merge(opts) if mode = h.delete(:mode) mode = mode.to_sym case mode when :ephemeral_sequential h[:ephemeral] = h[:sequence] = true when :persistent_sequential h[:ephemeral] = false h[:sequence] = true when :persistent h[:ephemeral] = false when :ephemeral h[:ephemeral] = true else raise ArgumentError, "Unknown mode: #{mode.inspect}" end end rv = check_rc(cnx.create(h), h) h[:callback] ? rv : rv[:path] end # Return the data and stat of the node of the given path. # # If `:watch` is true and the call is successful (no exception is # raised), registered watchers on the node will be 'armed'. The watch # will be triggered by a successful operation that sets data on the node, # or deletes the node. See `watcher` for documentation on how to register # blocks to be called when a watch event is fired. # # @todo fix references to Watcher documentation # # Supports being executed asynchronousy by passing a callback object. # # @param [String] path absolute path of the znode # # @option opts [bool] :watch (false) set to true if you want your registered # callbacks for this node to be called on change # # @option opts [ZookeeperCallbacks::DataCallback] :callback to make this call asynchronously # # @option opts [Object] :context an object passed to the `:callback` # given as the `context` param # # @return [Array] a two-element array of ['node data', #] # # @raise [ZK::Exceptions::NoNode] if no node with the given path exists. # # @example get data for path # # zk.get("/path") # # => ['this is the data', #] # # @example get data and set watch on node # # zk.get("/path", :watch => true) # # => ['this is the data', #] # # @hidden_example get data asynchronously # # class DataCallback # def process_result(return_code, path, context, data, stat) # # do processing here # end # end # # zk.get("/path") do |return_code, path, context, data, stat| # # do processing here # end # # callback = DataCallback.new # context = Object.new # zk.get("/path", :callback => callback, :context => context) # def get(path, opts={}) h = { :path => path }.merge(opts) rv = setup_watcher!(:data, h) do check_rc(cnx.get(h), h) end opts[:callback] ? rv : rv.values_at(:data, :stat) end # Set the data for the node of the given path if such a node exists and the # given version matches the version of the node (if the given version is # -1, it matches any node's versions). Passing the version allows you to # perform optimistic locking, in that if someone changes the node's # data "behind your back", your update will fail. Since #create does not # return a ZookeeperStat::Stat object, you should be aware that nodes are # created with version == 0. # # This operation, if successful, will trigger all the watches on the node # of the given path left by get calls. # # @raise [ZK::Exceptions::NoNode] raised if no node with the given path exists # # @raise [ZK::Exceptions::BadVersion] raised if the given version does not # match the node's version # # Called with a hash of arguments set. Supports being executed # asynchronousy by passing a callback object. # # @param [String] path absolute path of the znode # # @param [String] data the data to be set on the znode. Note that setting # the data to the exact same value currently on the node still increments # the node's version and causes watches to be fired. # # @option opts [Integer] :version (-1) matches all versions of a node if the # default is used, otherwise acts as an assertion that the znode has the # supplied version. # # @option opts [ZookeeperCallbacks::StatCallback] :callback will recieve the # ZookeeperStat::Stat object asynchronously # # @option opts [Object] :context an object passed to the `:callback` # given as the `context` param # # @example unconditionally set the data of "/path" # # zk.set("/path", "foo") # # @example set the data of "/path" only if the version is 0 # # zk.set("/path", "foo", :version => 0) # def set(path, data, opts={}) # ===== set data asynchronously # # class StatCallback # def process_result(return_code, path, context, stat) # # do processing here # end # end # # callback = StatCallback.new # context = Object.new # # zk.set("/path", "foo", :callback => callback, :context => context) h = { :path => path, :data => data }.merge(opts) rv = check_rc(cnx.set(h), h) opts[:callback] ? rv : rv[:stat] end # Return the stat of the node of the given path. Return nil if the node # doesn't exist. # # If the watch is true and the call is successful (no exception is thrown), # a watch will be left on the node with the given path. The watch will be # triggered by a successful operation that creates/delete the node or sets # the data on the node. # # Can be called with just the path, otherwise a hash with the arguments # set. Supports being executed asynchronousy by passing a callback object. # # @param [String] path absolute path of the znode # # @option opts [bool] :watch (false) set to true if you want to enable # registered watches on this node # # @option opts [ZookeeperCallbacks::StatCallback] :callback will recieve the # ZookeeperStat::Stat object asynchronously # # @option opts [Object] :context an object passed to the `:callback` # given as the `context` param # # @return [ZookeeperStat::Stat] a stat object of the specified node # # @example get stat for for path # >> zk.stat("/path") # # => ZK::Stat # # @example get stat for path and enable watchers # >> zk.stat("/path", :watch => true) # # => ZK::Stat # # @example exists for non existent path # # >> stat = zk.stat("/non_existent_path") # # => # # >> stat.exists? # # => false # # def stat(path, opts={}) h = { :path => path }.merge(opts) setup_watcher!(:data, h) do rv = cnx.stat(h) return rv if opts[:callback] case rv[:rc] when Zookeeper::ZOK, Zookeeper::ZNONODE rv[:stat] else check_rc(rv, h) # throws the appropriate error end end end # sugar around stat # # @example # # # instead of: # # zk.stat('/path').exists? # # => true # # # you can do: # # zk.exists?('/path') # # => true # # only works for the synchronous version of stat. for async version, # this method will act *exactly* like stat # def exists?(path, opts={}) # XXX: this should use the underlying 'exists' call! rv = stat(path, opts) opts[:callback] ? rv : rv.exists? end # Return the list of the children of the node of the given path. # # If the watch is true and the call is successful (no exception is thrown), # registered watchers of the children of the node will be enabled. The # watch will be triggered by a successful operation that deletes the node # of the given path or creates/delete a child under the node. See `watcher` # for documentation on how to register blocks to be called when a watch # event is fired. # # @note It is important to note that the list of children is _not sorted_. If you # need them to be ordered, you must call `.sort` on the returned array # # @raise [ZK::Exceptions::NoNode] if the node does not exist # # @param [String] path absolute path of the znode # # @option opts [bool] :watch (false) set to true if you want your registered # callbacks for this node to be called on change # # @option opts [ZookeeperCallbacks::StringsCallback] :callback to make this # call asynchronously # # @option opts [Object] :context an object passed to the `:callback` # given as the `context` param # # @example get children for path # # zk.create("/path", :data => "foo") # zk.create("/path/child_0", :data => "child0") # zk.create("/path/child_1", :data => "child1") # zk.children("/path") # # => ["child_0", "child_1"] # # @example get children and set watch # # # same setup as above # # zk.children("/path", :watch => true) # # => ["child_0", "child_1"] # def children(path, opts={}) # ===== get children asynchronously # # class ChildrenCallback # def process_result(return_code, path, context, children) # # do processing here # end # end # # callback = ChildrenCallback.new # context = Object.new # zk.children("/path", :callback => callback, :context => context) h = { :path => path }.merge(opts) rv = setup_watcher!(:child, h) do check_rc(cnx.get_children(h), h) end opts[:callback] ? rv : rv[:children] end # Delete the node with the given path. The call will succeed if such a node # exists, and the given version matches the node's version (if the given # version is -1, it matches any node's versions), and the node has no children. # # This operation, if successful, will trigger all the watches on the node # of the given path left by exists API calls, and the watches on the parent # node left by children API calls. # # Can be called with just the path, otherwise a hash with the arguments # set. Supports being executed asynchronousy by passing a callback object. # # @raise [ZK::Exceptions::NoNode] raised if no node with the given path exists # # @raise [ZK::Exceptions::BadVersion] raised if the given version does not # match the node's version # # @raise [ZK::Exceptions::NotEmpty] raised if the node has children # # @param [String] path absolute path of the znode # # @option opts [Integer] :version (-1) matches all versions of a node if the # default is used, otherwise acts as an assertion that the znode has the # supplied version. # # @option opts [ZookeeperCallbacks::VoidCallback] :callback will be called # asynchronously when the operation is complete # # @option opts [Object] :context an object passed to the `:callback` # given as the `context` param # # @example delete a node # zk.delete("/path") # # @example delete a node with a specific version # zk.delete("/path", :version => 5) # def delete(path, opts={}) # ===== delete node asynchronously # # class VoidCallback # def process_result(return_code, path, context) # # do processing here # end # end # # callback = VoidCallback.new # context = Object.new # # zk.delete(/path", :callback => callback, :context => context) h = { :path => path, :version => -1 }.merge(opts) rv = check_rc(cnx.delete(h), h) opts[:callback] ? rv : nil end # Return the ACL and stat of the node of the given path. # # @todo this method is pretty much untested, YMMV # # @raise [ZK::Exceptions::NoNode] if the parent node does not exist # # @param [String] path absolute path of the znode # # @option opts [ZookeeperStat::Stat] (nil) provide a Stat object that will # be set with the Stat information of the node path # # @option opts [ZookeeperCallback::AclCallback] (nil) :callback for an # asynchronous call to occur # # @option opts [Object] :context (nil) an object passed to the `:callback` # given as the `context` param # # @example get acl # # zk.get_acl("/path") # # => [ACL] # # @example get acl with stat # # stat = ZK::Stat.new # zk.get_acl("/path", :stat => stat) # # => [ACL] # def get_acl(path, opts={}) # ===== get acl asynchronously # # class AclCallback # def processResult(return_code, path, context, acl, stat) # # do processing here # end # end # # callback = AclCallback.new # context = Object.new # zk.acls("/path", :callback => callback, :context => context) h = { :path => path }.merge(opts) rv = check_rc(cnx.get_acl(h), h) opts[:callback] ? rv : rv.values_at(:children, :stat) end # Set the ACL for the node of the given path if such a node exists and the # given version matches the version of the node. Return the stat of the # node. # # @raise [ZK::Exceptions::NoNode] if the parent node does not exist # # @raise [ZK::Exceptions::BadVersion] raised if the given version does not # match the node's version # # @param [String] path absolute path of the znode # # @param [ZookeeperACLs] acls the acls to set on the znode # # @option opts [Integer] :version (-1) matches all versions of a node if the # default is used, otherwise acts as an assertion that the znode has the # supplied version. # # @option opts [ZookeeperCallbacks::VoidCallback] :callback will be called # asynchronously when the operation is complete # # @option opts [Object] :context an object passed to the `:callback` # given as the `context` param # # @todo: TBA - waiting on clarification of method use # def set_acl(path, acls, opts={}) h = { :path => path, :acl => acls }.merge(opts) rv = check_rc(cnx.set_acl(h), h) opts[:callback] ? rv : rv[:stat] end # @private # @todo need to document this a little more def set_debug_level(level) if defined?(::JRUBY_VERSION) warn "set_debug_level is not implemented for JRuby" return else num = case level when String, Symbol ZookeeperBase.const_get(:"ZOO_LOG_LEVEL_#{level.to_s.upcase}") rescue NameError when Integer level end raise ArgumentError, "#{level.inspect} is not a valid argument to set_debug_level" unless num cnx.set_debug_level(num) end end # @return [Fixnum] the session_id of the underlying connection def session_id cnx.session_id end # @return [String] the session_passwd of the underlying connection def session_passwd cnx.session_passwd end # Register a block that should be delivered events for a given path. After # registering a block, you need to call {#get}, {#stat}, or {#children} with the # `:watch => true` option for the block to receive _the next event_ (see note). # {#get} and {#stat} will cause the block to receive events when the path is # created, deleted, or its data is changed. {#children} will cause the block to # receive events about its list of child nodes changing (i.e. being added # or deleted, but *not* their content changing). # # This method will return an {EventHandlerSubscription} instance that can be used # to remove the block from further updates by calling its `.unsubscribe` method. # # You can specify a list of event types after the path that you wish to # receive in your block. This allows you to register different blocks for # different types of events. # # @note All node watchers are one-shot handlers. After an event is delivered to # your handler, you *must* re-watch the node to receive more events. This # leads to a pattern you will find throughout ZK code that avoids races, # see the example below "avoiding a race" # # @example avoiding a race waiting for a node to be deleted # # # we expect that '/path/to/node' exists currently and want to be notified # # when it's deleted # # # register a handler that will be called back when an event occurs on # # node # # # node_subscription = zk.register('/path/to/node') do |event| # if event.node_deleted? # do_something_when_node_deleted # end # end # # # check to see if our condition is true *while* setting a watch on the node # # if our condition happens to be true while setting the watch # # # unless exists?('/path/to/node', :watch => true) # node_subscription.unsubscribe # cancel the watch # do_something_when_node_deleted # call the callback # end # # @example only creation events # # sub = zk.register('/path/to/znode', :only => :created) do |event| # # do something when the node is created # end # # @example only changed or children events # # sub = zk.register('/path/to/znode', :only => [:changed, :child]) do |event| # if event.node_changed? # # do something on change # else # # we know it's a child event # end # end # # @example deprecated 1.0 style interests # # sub = zk.register('/path/to/znode', [:changed, :child]) do |event| # if event.node_changed? # # do something on change # else # # we know it's a child event # end # end # # @param [String,:all] path the znode path you want to listen to, or the # special value :all, that will cause the block to be delivered events # for all znode paths # @param [Block] block the block to execute when a watch event happpens # # @yield [event] We will call your block with the watch event object (which # has the connection the event occurred on as its #zk attribute) # # @return [EventHandlerSubscription] the subscription object # you can use to to unsubscribe from an event # # @overload register(path, interests=nil, &block) # @since 1.0 # # @deprecated use the `:only => :created` form # # @param [Array,Symbol,nil] interests a symbol or array-of-symbols indicating # which events you would like the block to be called for. Valid events # are :created, :deleted, :changed, and :child. If nil, the block will # receive all events # # @overload register(path, opts={}, &block) # @since 1.1 # # @option opts [Array,Symbol,nil] :only (nil) a symbol or array-of-symbols indicating # which events you would like the block to be called for. Valid events # are :created, :deleted, :changed, and :child. If nil, the block will # receive all events # # @see ZooKeeper::WatcherEvent # @see ZK::EventHandlerSubscription # @see https://github.com/slyphon/zk/wiki/Events the wiki page on using events effectively # def register(path, opts={}, &block) event_handler.register(path, opts, &block) end # returns true if the caller is calling from the event dispatch thread def event_dispatch_thread? cnx.event_dispatch_thread? end # @private def assert_we_are_not_on_the_event_dispatch_thread!(msg=nil) msg ||= "blocking method called on dispatch thread" raise Exceptions::EventDispatchThreadException, msg if event_dispatch_thread? end # called directly from the zookeeper event thread with every event, before they # get dispatched to the user callbacks. used by client implementations for # critical events like session_expired, so that we don't compete for # threads in the threadpool. # # @private def raw_event_handler(event) end protected # @private def check_rc(hash, inputs=nil) code = hash[:rc] if code && (code != Zookeeper::ZOK) msg = inputs ? "inputs: #{inputs.inspect}" : nil raise Exceptions::KeeperException.by_code(code), msg else hash end end # @private def setup_watcher!(watch_type, opts, &b) event_handler.setup_watcher!(watch_type, opts, &b) end # used in #inspect, doesn't raise an error if we're not connected def safe_session_id if cnx and cnx.session_id '0x%x' % cnx.session_id end rescue ZookeeperExceptions::ZookeeperException, ZK::Exceptions::KeeperException nil end end # Base end # Client end # ZK