module ZK # A ruby-friendly wrapper around the low-level zookeeper drivers. This is the # class that you will likely interact with the most. # class Client extend Forwardable DEFAULT_TIMEOUT = 10 attr_reader :event_handler attr_reader :cnx #:nodoc: # for backwards compatibility alias :watcher :event_handler #:nodoc: #:stopdoc: STATE_SYM_MAP = { Zookeeper::ZOO_CLOSED_STATE => :closed, Zookeeper::ZOO_EXPIRED_SESSION_STATE => :expired_session, Zookeeper::ZOO_AUTH_FAILED_STATE => :auth_failed, Zookeeper::ZOO_CONNECTING_STATE => :connecting, Zookeeper::ZOO_CONNECTED_STATE => :connected, Zookeeper::ZOO_ASSOCIATING_STATE => :associating, }.freeze #:startdoc: # Create a new client and connect to the zookeeper server. # # +host+ should be a string of comma-separated host:port pairs. You can # also supply an optional "chroot" suffix that will act as an implicit # prefix to all paths supplied. # # example: # # ZK::Client.new("zk01:2181,zk02:2181/chroot/path") # def initialize(host, opts={}) @event_handler = EventHandler.new(self) yield self if block_given? @cnx = ::Zookeeper.new(host, DEFAULT_TIMEOUT, @event_handler.get_default_watcher_block) @threadpool = Threadpool.new end # Queue an operation to be run on an internal threadpool. You may either # provide an object that responds_to?(:call) or pass a block. There is no # mechanism for retrieving the result of the operation, it is purely # fire-and-forget, so the user is expected to make arrangements for this in # their code. # # An ArgumentError will be raised if +callable+ does not respond_to?(:call) # # ==== Arguments # * callable: an object that respond_to?(:call), takes precedence # over a given block # def defer(callable=nil, &block) @threadpool.defer(callable, &block) end # returns true if the connection has been closed #-- # XXX: should this be *our* idea of closed or ZOO_CLOSED_STATE ? def closed? defined?(::JRUBY_VERSION) ? jruby_closed? : mri_closed? end private def jruby_closed? @cnx.state == Java::OrgApacheZookeeper::ZooKeeper::States::CLOSED end def mri_closed? @cnx.state or false rescue RuntimeError => e # gah, lame error parsing here raise e if (e.message != 'zookeeper handle is closed') and not defined?(::JRUBY_VERSION) true end public # returns the current state of the connection as reported by the underlying driver # as a symbol. The possible values are [:closed, :expired_session, :auth_failed # :connecting, :connected, :associating]. # # See the Zookeeper session # {documentation}[http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions] # for more information # def state if defined?(::JRUBY_VERSION) @cnx.state.to_string.downcase.to_sym else STATE_SYM_MAP.fetch(@cnx.state) { |k| raise IndexError, "unrecognized state: #{k}" } end end # reopen the underlying connection # returns state of connection after operation def reopen(timeout=10) @cnx.reopen(timeout, @event_handler.get_default_watcher_block) @threadpool.start! # restart the threadpool if previously stopped by close! state end # Returns true if the underlying connection is in the +connected+ state. def connected? wrap_state_closed_error { @cnx and @cnx.connected? } end # Returns true if the underlying connection is in the +associating+ state. def associating? wrap_state_closed_error { @cnx and @cnx.associating? } end # Returns true if the underlying connection is in the +connecting+ state. def connecting? wrap_state_closed_error { @cnx and @cnx.connecting? } end # Returns true if the underlying connection is in the +expired_session+ state. def expired_session? return nil unless @cnx if defined?(::JRUBY_VERSION) @cnx.state == Java::OrgApacheZookeeper::ZooKeeper::States::EXPIRED_SESSION else wrap_state_closed_error { @cnx.state == Zookeeper::ZOO_EXPIRED_SESSION_STATE } end end # does a stat on '/', returns true or false def ping? #:nodoc: false unless connected? false|stat('/') rescue ZK::Exceptions::KeeperException false end # Create a node with the given path. The node data will be the given data, # and node acl will be the given acl. The path is returned. # # The ephemeral argument specifies whether the created node will be # ephemeral or not. # # An ephemeral node will be removed by the server automatically when the # session associated with the creation of the node expires. # # The sequence argument can also specify to create a sequential node. The # actual path name of a sequential node will be the given path plus a # suffix "_i" where i is the current sequential number of the node. Once # such a node is created, the sequential number will be incremented by one. # # If a node with the same actual path already exists in the ZooKeeper, a # KeeperException with error code KeeperException::NodeExists will be # thrown. Note that since a different actual path is used for each # invocation of creating sequential node with the same path argument, the # call will never throw a NodeExists KeeperException. # # If the parent node does not exist in the ZooKeeper, a KeeperException # with error code KeeperException::NoNode will be thrown. # # An ephemeral node cannot have children. If the parent node of the given # path is ephemeral, a KeeperException with error code # KeeperException::NoChildrenForEphemerals will be thrown. # # This operation, if successful, will trigger all the watches left on the # node of the given path by exists and get API calls, and the watches left # on the parent node by children API calls. # # If a node is created successfully, the ZooKeeper server will trigger the # watches on the path left by exists calls, and the watches on the parent # of the node by children calls. # # Called with a hash of arguments set. Supports being executed # asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node # * data -- initial data for the node, defaults to an empty string # * :acl -- defaults to ACL::OPEN_ACL_UNSAFE, otherwise the ACL for the node # * :ephemeral -- defaults to false, if set to true the created node will be ephemeral # * :sequence -- defaults to false, if set to true the created node will be sequential # * :callback -- provide a AsyncCallback::StringCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # * :mode -- may be specified instead of :ephemeral and :sequence, # accepted values are [:ephemeral_sequential, :persistent_sequential, # :persistent, :ephemeral] # # ==== Examples # # ===== create node, no data, persistent # # zk.create("/path") # # => "/path" # # ===== create node, ACL will default to ACL::OPEN_ACL_UNSAFE # # zk.create("/path", "foo") # # => "/path" # # ===== create ephemeral node # zk.create("/path", :mode => :ephemeral) # # => "/path" # # ===== create sequential node # zk.create("/path", :mode => :persistent_sequence) # # => "/path0" # # ===== create ephemeral and sequential node # zk.create("/path", "foo", :mode => :ephemeral_sequence) # # => "/path0" # # ===== create a child path # zk.create("/path/child", "bar") # # => "/path/child" # # ===== create a sequential child path # zk.create("/path/child", "bar", :mode => :ephemeral_sequence) # # => "/path/child0" # #-- # TODO: document asynchronous callback # # ===== create asynchronously with callback object # # class StringCallback # def process_result(return_code, path, context, name) # # do processing here # end # end # # callback = StringCallback.new # context = Object.new # # zk.create("/path", "foo", :callback => callback, :context => context) # # ===== create asynchronously with callback proc # # callback = proc do |return_code, path, context, name| # # do processing here # end # # context = Object.new # # zk.create("/path", "foo", :callback => callback, :context => context) # #++ def create(path, data='', opts={}) h = { :path => path, :data => data, :ephemeral => false, :sequence => false }.merge(opts) if mode = h.delete(:mode) mode = mode.to_sym case mode when :ephemeral_sequential h[:ephemeral] = h[:sequence] = true when :persistent_sequential h[:ephemeral] = false h[:sequence] = true when :persistent h[:ephemeral] = false when :ephemeral h[:ephemeral] = true else raise ArgumentError, "Unknown mode: #{mode.inspect}" end end rv = check_rc(@cnx.create(h), h) h[:callback] ? rv : rv[:path] end # Return the data and stat of the node of the given path. # # If the watch is true and the call is successfull (no exception is # thrown), a watch will be left on the node with the given path. The watch # will be triggered by a successful operation that sets data on the node, # or deletes the node. See +watcher+ for documentation on how to register # blocks to be called when a watch event is fired. # # A KeeperException with error code KeeperException::NoNode will be thrown # if no node with the given path exists. # # Supports being executed asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node # * :watch -- defaults to false, set to true if you need to watch this node # * :callback -- provide a AsyncCallback::DataCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # ===== get data for path # zk.get("/path") # # ===== get data and set watch on node # zk.get("/path", :watch => true) # #-- # ===== get data asynchronously # # class DataCallback # def process_result(return_code, path, context, data, stat) # # do processing here # end # end # # zk.get("/path") do |return_code, path, context, data, stat| # # do processing here # end # # callback = DataCallback.new # context = Object.new # zk.get("/path", :callback => callback, :context => context) #++ def get(path, opts={}) h = { :path => path }.merge(opts) setup_watcher!(:data, h) rv = check_rc(@cnx.get(h), h) opts[:callback] ? rv : rv.values_at(:data, :stat) end # Set the data for the node of the given path if such a node exists and the # given version matches the version of the node (if the given version is # -1, it matches any node's versions). Return the stat of the node. # # This operation, if successful, will trigger all the watches on the node # of the given path left by get_data calls. # # A KeeperException with error code KeeperException::NoNode will be thrown # if no node with the given path exists. A KeeperException with error code # KeeperException::BadVersion will be thrown if the given version does not # match the node's version. # # Called with a hash of arguments set. Supports being executed # asynchronousy by passing a callback object. # # ==== Arguments # * :path -- path of the node # * :data -- data to set # * :version -- defaults to -1, otherwise set to the expected matching version # * :callback -- provide a AsyncCallback::StatCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # zk.set("/path", "foo") # zk.set("/path", "foo", :version => 0) # #-- # ===== set data asynchronously # # class StatCallback # def process_result(return_code, path, context, stat) # # do processing here # end # end # # callback = StatCallback.new # context = Object.new # # zk.set("/path", "foo", :callback => callback, :context => context) #++ def set(path, data, opts={}) h = { :path => path, :data => data }.merge(opts) rv = check_rc(@cnx.set(h), h) opts[:callback] ? nil : rv[:stat] end # Return the stat of the node of the given path. Return nil if the node # doesn't exist. # # If the watch is true and the call is successful (no exception is thrown), # a watch will be left on the node with the given path. The watch will be # triggered by a successful operation that creates/delete the node or sets # the data on the node. # # Can be called with just the path, otherwise a hash with the arguments # set. Supports being executed asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node # * :watch -- defaults to false, set to true if you need to watch # this node # * :callback -- provide a AsyncCallback::StatCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # ===== exists for path # zk.stat("/path") # # => ZK::Stat # # ===== exists for path with watch set # zk.stat("/path", :watch => true) # # => ZK::Stat # # ===== exists for non existent path # zk.stat("/non_existent_path") # # => nil # #-- # ===== exist node asynchronously # # class StatCallback # def process_result(return_code, path, context, stat) # # do processing here # end # end # # callback = StatCallback.new # context = Object.new # # zk.exists?("/path", :callback => callback, :context => context) #++ def stat(path, opts={}) h = { :path => path }.merge(opts) setup_watcher!(:data, h) rv = @cnx.stat(h) return rv if opts[:callback] case rv[:rc] when Zookeeper::ZOK, Zookeeper::ZNONODE rv[:stat] else check_rc(rv, h) # throws the appropriate error end end # sugar around stat # # ===== instead of # zk.stat('/path').exists? # # => true # # ===== you can do # zk.exists?('/path') # # => true # # this only works for the synchronous version of stat. for async version, # this method will act *exactly* like stat # def exists?(path, opts={}) rv = stat(path, opts) opts[:callback] ? rv : rv.exists? end # closes the underlying connection and deregisters all callbacks def close! @event_handler.clear! wrap_state_closed_error { @cnx.close } @threadpool.shutdown nil end # Delete the node with the given path. The call will succeed if such a node # exists, and the given version matches the node's version (if the given # version is -1, it matches any node's versions). # # A KeeperException with error code KeeperException::NoNode will be thrown # if the nodes does not exist. # # A KeeperException with error code KeeperException::BadVersion will be # thrown if the given version does not match the node's version. # # A KeeperException with error code KeeperException::NotEmpty will be # thrown if the node has children. # # This operation, if successful, will trigger all the watches on the node # of the given path left by exists API calls, and the watches on the parent # node left by children API calls. # # Can be called with just the path, otherwise a hash with the arguments # set. Supports being executed asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node to be deleted # * :version -- defaults to -1 (deletes any version), otherwise # set to the expected matching version # * :callback -- provide a AsyncCallback::VoidCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # zk.delete("/path") # zk.delete("/path", :version => 0) # #-- # ===== delete node asynchronously # # class VoidCallback # def process_result(return_code, path, context) # # do processing here # end # end # # callback = VoidCallback.new # context = Object.new # # zk.delete(/path", :callback => callback, :context => context) #++ def delete(path, opts={}) h = { :path => path, :version => -1 }.merge(opts) rv = check_rc(@cnx.delete(h), h) nil end # Return the list of the children of the node of the given path. # # If the watch is true and the call is successful (no exception is thrown), # a watch will be left on the node with the given path. The watch will be # triggered by a successful operation that deletes the node of the given # path or creates/delete a child under the node. See +watcher+ for # documentation on how to register blocks to be called when a watch event # is fired. # # A KeeperException with error code KeeperException::NoNode will be thrown # if no node with the given path exists. # # Can be called with just the path, otherwise a hash with the arguments # set. Supports being executed asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node # * :watch -- defaults to false, set to true if you need to watch # this node # * :callback -- provide a AsyncCallback::ChildrenCallback object # or Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # ===== get children for path # zk.create("/path", :data => "foo") # zk.create("/path/child", :data => "child1", :sequence => true) # zk.create("/path/child", :data => "child2", :sequence => true) # zk.children("/path") # # => ["child0", "child1"] # # ====== get children and set watch # zk.children("/path", :watch => true) # # => ["child0", "child1"] # #-- # ===== get children asynchronously # # class ChildrenCallback # def process_result(return_code, path, context, children) # # do processing here # end # end # # callback = ChildrenCallback.new # context = Object.new # zk.children("/path", :callback => callback, :context => context) #++ def children(path, opts={}) h = { :path => path }.merge(opts) setup_watcher!(:child, h) rv = check_rc(@cnx.get_children(h), h) opts[:callback] ? nil : rv[:children] end # Return the ACL and stat of the node of the given path. # # A KeeperException with error code KeeperException::Code::NoNode will be # thrown if no node with the given path exists. # # Can be called with just the path, otherwise a hash with the arguments # set. Supports being executed asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node # * :stat -- defaults to nil, provide a Stat object that will be # set with the Stat information of the node path (TODO: test this) # * :callback -- provide a AsyncCallback::AclCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # ===== get acl # zk.get_acl("/path") # # => [ACL] # # ===== get acl with stat # stat = ZK::Stat.new # zk.get_acl("/path", :stat => stat) # #-- # ===== get acl asynchronously # # class AclCallback # def processResult(return_code, path, context, acl, stat) # # do processing here # end # end # # callback = AclCallback.new # context = Object.new # zk.acls("/path", :callback => callback, :context => context) #++ def get_acl(path, opts={}) h = { :path => path }.merge(opts) rv = check_rc(@cnx.get_acl(h), h) opts[:callback] ? nil : rv.values_at(:children, :stat) end # Set the ACL for the node of the given path if such a node exists and the # given version matches the version of the node. Return the stat of the # node. # # A KeeperException with error code KeeperException::Code::NoNode will be # thrown if no node with the given path exists. # # A KeeperException with error code KeeperException::Code::BadVersion will # be thrown if the given version does not match the node's version. # # Called with a hash of arguments set. Supports being executed # asynchronousy by passing a callback object. # # ==== Arguments # * path -- path of the node # * :acl -- acl to set # * :version -- defaults to -1, otherwise set to the expected matching version # * :callback -- provide a AsyncCallback::StatCallback object or # Proc for an asynchronous call to occur # * :context -- context object passed into callback method # # ==== Examples # TBA - waiting on clarification of method use # def set_acl(path, acls, opts={}) h = { :path => path, :acl => acls }.merge(opts) rv = check_rc(@cnx.set_acl(h), h) opts[:callback] ? nil : rv[:stat] end #-- # # EXTENSIONS # # convenience methods for dealing with zookeeper (rm -rf, mkdir -p, etc) # #++ # Creates all parent paths and 'path' in zookeeper as persistent nodes with # zero data. # # ==== Arguments # * path: An absolute znode path to create # # ==== Examples # # zk.exists?('/path') # # => false # # zk.mkdir_p('/path/to/blah') # # => "/path/to/blah" # #-- # TODO: write a non-recursive version of this. ruby doesn't have TCO, so # this could get expensive w/ psychotically long paths def mkdir_p(path) create(path, '', :mode => :persistent) rescue Exceptions::NodeExists return rescue Exceptions::NoNode if File.dirname(path) == '/' # ok, we're screwed, blow up raise KeeperException, "could not create '/', something is wrong", caller end mkdir_p(File.dirname(path)) retry end # recursively remove all children of path then remove path itself def rm_rf(paths) Array(paths).flatten.each do |path| begin children(path).each do |child| rm_rf(File.join(path, child)) end delete(path) nil rescue Exceptions::NoNode end end end # see ZK::Find for explanation def find(*paths, &block) ZK::Find.find(self, *paths, &block) end # will block the caller until +abs_node_path+ has been removed # # NOTE: this is dangerous to use in callbacks! there is only one # event-delivery thread, so if you use this method in a callback or # watcher, you *will* deadlock! def block_until_node_deleted(abs_node_path) queue = Queue.new ev_sub = nil node_deletion_cb = lambda do |event| if event.node_deleted? queue.enq(:deleted) else queue.enq(:deleted) unless exists?(abs_node_path, :watch => true) end end ev_sub = watcher.register(abs_node_path, &node_deletion_cb) # set up the callback, but bail if we don't need to wait return true unless exists?(abs_node_path, :watch => true) queue.pop # block waiting for node deletion true ensure # be sure we clean up after ourselves ev_sub.unregister if ev_sub end # creates a new locker based on the name you send in # # see ZK::Locker::ExclusiveLocker # # returns a ZK::Locker::ExclusiveLocker instance using this Client and provided # lock name # # ==== Arguments # * name name of the lock you wish to use # # ==== Examples # # zk.locker("blah") # # => # # def locker(name) Locker.exclusive_locker(self, name) end # create a new shared locking instance based on the name given # # returns a ZK::Locker::SharedLocker instance using this Client and provided # lock name # # ==== Arguments # * name name of the lock you wish to use # # ==== Examples # # zk.shared_locker("blah") # # => # # def shared_locker(name) Locker.shared_locker(self, name) end # Convenience method for acquiring a lock then executing a code block. This # will block the caller until the lock is acquired. # # ==== Arguments # * name: the name of the lock to use # * :mode: either :shared or :exclusive, defaults to :exclusive # # ==== Examples # # zk.with_lock('foo') do # # this code is executed while holding the lock # end # def with_lock(name, opts={}, &b) mode = opts[:mode] || :exclusive raise ArgumentError, ":mode option must be either :shared or :exclusive, not #{mode.inspect}" unless [:shared, :exclusive].include?(mode) if mode == :shared shared_locker(name).with_lock(&b) else locker(name).with_lock(&b) end end # Convenience method for constructing a ZK::Election::Candidate object using this # Client connection, the given election +name+ and +data+. # def election_candidate(name, data, opts={}) opts = opts.merge(:data => data) ZK::Election::Candidate.new(self, name, opts) end # Convenience method for constructing a ZK::Election::Observer object using this # Client connection, and the given election +name+. # def election_observer(name, opts={}) ZK::Election::Observer.new(self, name, opts) end # creates a new message queue of name +name+ # # returns a ZK::MessageQueue object # # ==== Arguments # * name the name of the queue # # ==== Examples # # zk.queue("blah").publish({:some_data => "that is yaml serializable"}) # def queue(name) MessageQueue.new(self, name) end def set_debug_level(level) #:nodoc: if defined?(::JRUBY_VERSION) warn "set_debug_level is not implemented for JRuby" return else num = case level when String, Symbol ZookeeperBase.const_get(:"ZOO_LOG_LEVEL_#{level.to_s.upcase}") rescue NameError when Integer level end raise ArgumentError, "#{level.inspect} is not a valid argument to set_debug_level" unless num @cnx.set_debug_level(num) end end # Register a block to be called on connection, when the client has # connected. The block will *always* be called asynchronously (on a # background thread). # # the block will be called with no arguments # # returns an EventHandlerSubscription object that can be used to unregister # this block from further updates # def on_connected(&block) watcher.register_state_handler(:connected, &block).tap do defer { block.call } if connected? end end # register a block to be called when the client is attempting to reconnect # to the zookeeper server. the documentation says that this state should be # taken to mean that the application should enter into "safe mode" and operate # conservatively, as it won't be getting updates until it has reconnected # def on_connecting(&block) watcher.register_state_handler(:connecting, &block).tap do defer { block.call } if connecting? end end # register a block to be called when our session has expired. This usually happens # due to a network partitioning event, and means that all callbacks and watches must # be re-registered with the server #--- # NOTE: need to come up with a way to test this def on_expired_session(&block) watcher.register_state_handler(:expired_session, &block).tap do defer { block.call } if expired_session? end end protected def wrap_state_closed_error yield rescue RuntimeError => e # gah, lame error parsing here raise e unless e.message == 'zookeeper handle is closed' false end def check_rc(hash, inputs=nil) hash.tap do |h| if code = h[:rc] msg = inputs ? "inputs: #{inputs.inspect}" : nil raise Exceptions::KeeperException.by_code(code), msg unless code == Zookeeper::ZOK end end end def setup_watcher!(watch_type, opts) @event_handler.setup_watcher!(watch_type, opts) end end end