lib/z_k/client.rb in zk-0.6.5 vs lib/z_k/client.rb in zk-0.7.1
- old
+ new
@@ -1,910 +1,31 @@
module ZK
# A ruby-friendly wrapper around the low-level zookeeper drivers. This is the
# class that you will likely interact with the most.
#
- class Client
- extend Forwardable
-
+ # @todo ACL support is pretty much unused currently.
+ # If anyone has suggestions, hints, use-cases, examples, etc. by all means please file a bug.
+ #
+ module Client
DEFAULT_TIMEOUT = 10
- attr_reader :event_handler
-
- attr_reader :cnx #:nodoc:
-
- # for backwards compatibility
- alias :watcher :event_handler #:nodoc:
-
- #:stopdoc:
+ # @private
STATE_SYM_MAP = {
Zookeeper::ZOO_CLOSED_STATE => :closed,
Zookeeper::ZOO_EXPIRED_SESSION_STATE => :expired_session,
Zookeeper::ZOO_AUTH_FAILED_STATE => :auth_failed,
Zookeeper::ZOO_CONNECTING_STATE => :connecting,
Zookeeper::ZOO_CONNECTED_STATE => :connected,
Zookeeper::ZOO_ASSOCIATING_STATE => :associating,
}.freeze
- #:startdoc:
- # Create a new client and connect to the zookeeper server.
- #
- # +host+ should be a string of comma-separated host:port pairs. You can
- # also supply an optional "chroot" suffix that will act as an implicit
- # prefix to all paths supplied.
- #
- # example:
- #
- # ZK::Client.new("zk01:2181,zk02:2181/chroot/path")
- #
- def initialize(host, opts={})
- @event_handler = EventHandler.new(self)
- yield self if block_given?
- @cnx = ::Zookeeper.new(host, DEFAULT_TIMEOUT, @event_handler.get_default_watcher_block)
- @threadpool = Threadpool.new
+ def self.new(*a, &b)
+ Base.new(*a, &b)
end
-
- # Queue an operation to be run on an internal threadpool. You may either
- # provide an object that responds_to?(:call) or pass a block. There is no
- # mechanism for retrieving the result of the operation, it is purely
- # fire-and-forget, so the user is expected to make arrangements for this in
- # their code.
- #
- # An ArgumentError will be raised if +callable+ does not <tt>respond_to?(:call)</tt>
- #
- # ==== Arguments
- # * <tt>callable</tt>: an object that <tt>respond_to?(:call)</tt>, takes precedence
- # over a given block
- #
- def defer(callable=nil, &block)
- @threadpool.defer(callable, &block)
- end
-
- # returns true if the connection has been closed
- #--
- # XXX: should this be *our* idea of closed or ZOO_CLOSED_STATE ?
- def closed?
- defined?(::JRUBY_VERSION) ? jruby_closed? : mri_closed?
- end
-
- private
- def jruby_closed?
- @cnx.state == Java::OrgApacheZookeeper::ZooKeeper::States::CLOSED
- end
-
- def mri_closed?
- @cnx.state or false
- rescue RuntimeError => e
- # gah, lame error parsing here
- raise e if (e.message != 'zookeeper handle is closed') and not defined?(::JRUBY_VERSION)
- true
- end
-
- public
-
- # returns the current state of the connection as reported by the underlying driver
- # as a symbol. The possible values are <tt>[:closed, :expired_session, :auth_failed
- # :connecting, :connected, :associating]</tt>.
- #
- # See the Zookeeper session
- # {documentation}[http://hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkSessions]
- # for more information
- #
- def state
- if defined?(::JRUBY_VERSION)
- @cnx.state.to_string.downcase.to_sym
- else
- STATE_SYM_MAP.fetch(@cnx.state) { |k| raise IndexError, "unrecognized state: #{k}" }
- end
- end
-
- # reopen the underlying connection
- # returns state of connection after operation
- def reopen(timeout=10)
- @cnx.reopen(timeout, @event_handler.get_default_watcher_block)
- @threadpool.start! # restart the threadpool if previously stopped by close!
- state
- end
-
- # Returns true if the underlying connection is in the +connected+ state.
- def connected?
- wrap_state_closed_error { @cnx and @cnx.connected? }
- end
-
- # Returns true if the underlying connection is in the +associating+ state.
- def associating?
- wrap_state_closed_error { @cnx and @cnx.associating? }
- end
-
- # Returns true if the underlying connection is in the +connecting+ state.
- def connecting?
- wrap_state_closed_error { @cnx and @cnx.connecting? }
- end
-
- # Returns true if the underlying connection is in the +expired_session+ state.
- def expired_session?
- return nil unless @cnx
-
- if defined?(::JRUBY_VERSION)
- @cnx.state == Java::OrgApacheZookeeper::ZooKeeper::States::EXPIRED_SESSION
- else
- wrap_state_closed_error { @cnx.state == Zookeeper::ZOO_EXPIRED_SESSION_STATE }
- end
- end
-
- # does a stat on '/', returns true or false
- def ping? #:nodoc:
- false unless connected?
- false|stat('/')
- rescue ZK::Exceptions::KeeperException
- false
- end
-
- # Create a node with the given path. The node data will be the given data,
- # and node acl will be the given acl. The path is returned.
- #
- # The ephemeral argument specifies whether the created node will be
- # ephemeral or not.
- #
- # An ephemeral node will be removed by the server automatically when the
- # session associated with the creation of the node expires.
- #
- # The sequence argument can also specify to create a sequential node. The
- # actual path name of a sequential node will be the given path plus a
- # suffix "_i" where i is the current sequential number of the node. Once
- # such a node is created, the sequential number will be incremented by one.
- #
- # If a node with the same actual path already exists in the ZooKeeper, a
- # KeeperException with error code KeeperException::NodeExists will be
- # thrown. Note that since a different actual path is used for each
- # invocation of creating sequential node with the same path argument, the
- # call will never throw a NodeExists KeeperException.
- #
- # If the parent node does not exist in the ZooKeeper, a KeeperException
- # with error code KeeperException::NoNode will be thrown.
- #
- # An ephemeral node cannot have children. If the parent node of the given
- # path is ephemeral, a KeeperException with error code
- # KeeperException::NoChildrenForEphemerals will be thrown.
- #
- # This operation, if successful, will trigger all the watches left on the
- # node of the given path by exists and get API calls, and the watches left
- # on the parent node by children API calls.
- #
- # If a node is created successfully, the ZooKeeper server will trigger the
- # watches on the path left by exists calls, and the watches on the parent
- # of the node by children calls.
- #
- # Called with a hash of arguments set. Supports being executed
- # asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node
- # * <tt>data</tt> -- initial data for the node, defaults to an empty string
- # * <tt>:acl</tt> -- defaults to <tt>ACL::OPEN_ACL_UNSAFE</tt>, otherwise the ACL for the node
- # * <tt>:ephemeral</tt> -- defaults to false, if set to true the created node will be ephemeral
- # * <tt>:sequence</tt> -- defaults to false, if set to true the created node will be sequential
- # * <tt>:callback</tt> -- provide a AsyncCallback::StringCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- # * <tt>:mode</tt> -- may be specified instead of :ephemeral and :sequence,
- # accepted values are <tt>[:ephemeral_sequential, :persistent_sequential,
- # :persistent, :ephemeral]</tt>
- #
- # ==== Examples
- #
- # ===== create node, no data, persistent
- #
- # zk.create("/path")
- # # => "/path"
- #
- # ===== create node, ACL will default to ACL::OPEN_ACL_UNSAFE
- #
- # zk.create("/path", "foo")
- # # => "/path"
- #
- # ===== create ephemeral node
- # zk.create("/path", :mode => :ephemeral)
- # # => "/path"
- #
- # ===== create sequential node
- # zk.create("/path", :mode => :persistent_sequence)
- # # => "/path0"
- #
- # ===== create ephemeral and sequential node
- # zk.create("/path", "foo", :mode => :ephemeral_sequence)
- # # => "/path0"
- #
- # ===== create a child path
- # zk.create("/path/child", "bar")
- # # => "/path/child"
- #
- # ===== create a sequential child path
- # zk.create("/path/child", "bar", :mode => :ephemeral_sequence)
- # # => "/path/child0"
- #
- #--
- # TODO: document asynchronous callback
- #
- # ===== create asynchronously with callback object
- #
- # class StringCallback
- # def process_result(return_code, path, context, name)
- # # do processing here
- # end
- # end
- #
- # callback = StringCallback.new
- # context = Object.new
- #
- # zk.create("/path", "foo", :callback => callback, :context => context)
- #
- # ===== create asynchronously with callback proc
- #
- # callback = proc do |return_code, path, context, name|
- # # do processing here
- # end
- #
- # context = Object.new
- #
- # zk.create("/path", "foo", :callback => callback, :context => context)
- #
- #++
- def create(path, data='', opts={})
- h = { :path => path, :data => data, :ephemeral => false, :sequence => false }.merge(opts)
-
- if mode = h.delete(:mode)
- mode = mode.to_sym
-
- case mode
- when :ephemeral_sequential
- h[:ephemeral] = h[:sequence] = true
- when :persistent_sequential
- h[:ephemeral] = false
- h[:sequence] = true
- when :persistent
- h[:ephemeral] = false
- when :ephemeral
- h[:ephemeral] = true
- else
- raise ArgumentError, "Unknown mode: #{mode.inspect}"
- end
- end
-
- rv = check_rc(@cnx.create(h), h)
-
- h[:callback] ? rv : rv[:path]
- end
-
- # Return the data and stat of the node of the given path.
- #
- # If the watch is true and the call is successfull (no exception is
- # thrown), a watch will be left on the node with the given path. The watch
- # will be triggered by a successful operation that sets data on the node,
- # or deletes the node. See +watcher+ for documentation on how to register
- # blocks to be called when a watch event is fired.
- #
- # A KeeperException with error code KeeperException::NoNode will be thrown
- # if no node with the given path exists.
- #
- # Supports being executed asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node
- # * <tt>:watch</tt> -- defaults to false, set to true if you need to watch this node
- # * <tt>:callback</tt> -- provide a AsyncCallback::DataCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # ===== get data for path
- # zk.get("/path")
- #
- # ===== get data and set watch on node
- # zk.get("/path", :watch => true)
- #
- #--
- # ===== get data asynchronously
- #
- # class DataCallback
- # def process_result(return_code, path, context, data, stat)
- # # do processing here
- # end
- # end
- #
- # zk.get("/path") do |return_code, path, context, data, stat|
- # # do processing here
- # end
- #
- # callback = DataCallback.new
- # context = Object.new
- # zk.get("/path", :callback => callback, :context => context)
- #++
- def get(path, opts={})
- h = { :path => path }.merge(opts)
-
- setup_watcher!(:data, h)
-
- rv = check_rc(@cnx.get(h), h)
-
- opts[:callback] ? rv : rv.values_at(:data, :stat)
- end
-
- # Set the data for the node of the given path if such a node exists and the
- # given version matches the version of the node (if the given version is
- # -1, it matches any node's versions). Return the stat of the node.
- #
- # This operation, if successful, will trigger all the watches on the node
- # of the given path left by get_data calls.
- #
- # A KeeperException with error code KeeperException::NoNode will be thrown
- # if no node with the given path exists. A KeeperException with error code
- # KeeperException::BadVersion will be thrown if the given version does not
- # match the node's version.
- #
- # Called with a hash of arguments set. Supports being executed
- # asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>:path</tt> -- path of the node
- # * <tt>:data</tt> -- data to set
- # * <tt>:version</tt> -- defaults to -1, otherwise set to the expected matching version
- # * <tt>:callback</tt> -- provide a AsyncCallback::StatCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # zk.set("/path", "foo")
- # zk.set("/path", "foo", :version => 0)
- #
- #--
- # ===== set data asynchronously
- #
- # class StatCallback
- # def process_result(return_code, path, context, stat)
- # # do processing here
- # end
- # end
- #
- # callback = StatCallback.new
- # context = Object.new
- #
- # zk.set("/path", "foo", :callback => callback, :context => context)
- #++
- def set(path, data, opts={})
- h = { :path => path, :data => data }.merge(opts)
-
- rv = check_rc(@cnx.set(h), h)
-
- opts[:callback] ? nil : rv[:stat]
- end
-
- # Return the stat of the node of the given path. Return nil if the node
- # doesn't exist.
- #
- # If the watch is true and the call is successful (no exception is thrown),
- # a watch will be left on the node with the given path. The watch will be
- # triggered by a successful operation that creates/delete the node or sets
- # the data on the node.
- #
- # Can be called with just the path, otherwise a hash with the arguments
- # set. Supports being executed asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node
- # * <tt>:watch</tt> -- defaults to false, set to true if you need to watch
- # this node
- # * <tt>:callback</tt> -- provide a AsyncCallback::StatCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # ===== exists for path
- # zk.stat("/path")
- # # => ZK::Stat
- #
- # ===== exists for path with watch set
- # zk.stat("/path", :watch => true)
- # # => ZK::Stat
- #
- # ===== exists for non existent path
- # zk.stat("/non_existent_path")
- # # => nil
- #
- #--
- # ===== exist node asynchronously
- #
- # class StatCallback
- # def process_result(return_code, path, context, stat)
- # # do processing here
- # end
- # end
- #
- # callback = StatCallback.new
- # context = Object.new
- #
- # zk.exists?("/path", :callback => callback, :context => context)
- #++
- def stat(path, opts={})
- h = { :path => path }.merge(opts)
-
- setup_watcher!(:data, h)
-
- rv = @cnx.stat(h)
-
- return rv if opts[:callback]
-
- case rv[:rc]
- when Zookeeper::ZOK, Zookeeper::ZNONODE
- rv[:stat]
- else
- check_rc(rv, h) # throws the appropriate error
- end
- end
-
- # sugar around stat
- #
- # ===== instead of
- # zk.stat('/path').exists?
- # # => true
- #
- # ===== you can do
- # zk.exists?('/path')
- # # => true
- #
- # this only works for the synchronous version of stat. for async version,
- # this method will act *exactly* like stat
- #
- def exists?(path, opts={})
- rv = stat(path, opts)
- opts[:callback] ? rv : rv.exists?
- end
-
- # closes the underlying connection and deregisters all callbacks
- def close!
- @event_handler.clear!
- wrap_state_closed_error { @cnx.close }
- @threadpool.shutdown
- nil
- end
-
- # Delete the node with the given path. The call will succeed if such a node
- # exists, and the given version matches the node's version (if the given
- # version is -1, it matches any node's versions).
- #
- # A KeeperException with error code KeeperException::NoNode will be thrown
- # if the nodes does not exist.
- #
- # A KeeperException with error code KeeperException::BadVersion will be
- # thrown if the given version does not match the node's version.
- #
- # A KeeperException with error code KeeperException::NotEmpty will be
- # thrown if the node has children.
- #
- # This operation, if successful, will trigger all the watches on the node
- # of the given path left by exists API calls, and the watches on the parent
- # node left by children API calls.
- #
- # Can be called with just the path, otherwise a hash with the arguments
- # set. Supports being executed asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node to be deleted
- # * <tt>:version</tt> -- defaults to -1 (deletes any version), otherwise
- # set to the expected matching version
- # * <tt>:callback</tt> -- provide a AsyncCallback::VoidCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # zk.delete("/path")
- # zk.delete("/path", :version => 0)
- #
- #--
- # ===== delete node asynchronously
- #
- # class VoidCallback
- # def process_result(return_code, path, context)
- # # do processing here
- # end
- # end
- #
- # callback = VoidCallback.new
- # context = Object.new
- #
- # zk.delete(/path", :callback => callback, :context => context)
- #++
- def delete(path, opts={})
- h = { :path => path, :version => -1 }.merge(opts)
- rv = check_rc(@cnx.delete(h), h)
- nil
- end
-
- # Return the list of the children of the node of the given path.
- #
- # If the watch is true and the call is successful (no exception is thrown),
- # a watch will be left on the node with the given path. The watch will be
- # triggered by a successful operation that deletes the node of the given
- # path or creates/delete a child under the node. See +watcher+ for
- # documentation on how to register blocks to be called when a watch event
- # is fired.
- #
- # A KeeperException with error code KeeperException::NoNode will be thrown
- # if no node with the given path exists.
- #
- # Can be called with just the path, otherwise a hash with the arguments
- # set. Supports being executed asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node
- # * <tt>:watch</tt> -- defaults to false, set to true if you need to watch
- # this node
- # * <tt>:callback</tt> -- provide a AsyncCallback::ChildrenCallback object
- # or Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # ===== get children for path
- # zk.create("/path", :data => "foo")
- # zk.create("/path/child", :data => "child1", :sequence => true)
- # zk.create("/path/child", :data => "child2", :sequence => true)
- # zk.children("/path")
- # # => ["child0", "child1"]
- #
- # ====== get children and set watch
- # zk.children("/path", :watch => true)
- # # => ["child0", "child1"]
- #
- #--
- # ===== get children asynchronously
- #
- # class ChildrenCallback
- # def process_result(return_code, path, context, children)
- # # do processing here
- # end
- # end
- #
- # callback = ChildrenCallback.new
- # context = Object.new
- # zk.children("/path", :callback => callback, :context => context)
- #++
- def children(path, opts={})
- h = { :path => path }.merge(opts)
-
- setup_watcher!(:child, h)
-
- rv = check_rc(@cnx.get_children(h), h)
- opts[:callback] ? nil : rv[:children]
- end
-
- # Return the ACL and stat of the node of the given path.
- #
- # A KeeperException with error code KeeperException::Code::NoNode will be
- # thrown if no node with the given path exists.
- #
- # Can be called with just the path, otherwise a hash with the arguments
- # set. Supports being executed asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node
- # * <tt>:stat</tt> -- defaults to nil, provide a Stat object that will be
- # set with the Stat information of the node path (TODO: test this)
- # * <tt>:callback</tt> -- provide a AsyncCallback::AclCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # ===== get acl
- # zk.get_acl("/path")
- # # => [ACL]
- #
- # ===== get acl with stat
- # stat = ZK::Stat.new
- # zk.get_acl("/path", :stat => stat)
- #
- #--
- # ===== get acl asynchronously
- #
- # class AclCallback
- # def processResult(return_code, path, context, acl, stat)
- # # do processing here
- # end
- # end
- #
- # callback = AclCallback.new
- # context = Object.new
- # zk.acls("/path", :callback => callback, :context => context)
- #++
- def get_acl(path, opts={})
- h = { :path => path }.merge(opts)
- rv = check_rc(@cnx.get_acl(h), h)
- opts[:callback] ? nil : rv.values_at(:children, :stat)
- end
-
- # Set the ACL for the node of the given path if such a node exists and the
- # given version matches the version of the node. Return the stat of the
- # node.
- #
- # A KeeperException with error code KeeperException::Code::NoNode will be
- # thrown if no node with the given path exists.
- #
- # A KeeperException with error code KeeperException::Code::BadVersion will
- # be thrown if the given version does not match the node's version.
- #
- # Called with a hash of arguments set. Supports being executed
- # asynchronousy by passing a callback object.
- #
- # ==== Arguments
- # * <tt>path</tt> -- path of the node
- # * <tt>:acl</tt> -- acl to set
- # * <tt>:version</tt> -- defaults to -1, otherwise set to the expected matching version
- # * <tt>:callback</tt> -- provide a AsyncCallback::StatCallback object or
- # Proc for an asynchronous call to occur
- # * <tt>:context</tt> -- context object passed into callback method
- #
- # ==== Examples
- # TBA - waiting on clarification of method use
- #
- def set_acl(path, acls, opts={})
- h = { :path => path, :acl => acls }.merge(opts)
- rv = check_rc(@cnx.set_acl(h), h)
- opts[:callback] ? nil : rv[:stat]
- end
-
- #--
- #
- # EXTENSIONS
- #
- # convenience methods for dealing with zookeeper (rm -rf, mkdir -p, etc)
- #
- #++
-
- # Creates all parent paths and 'path' in zookeeper as persistent nodes with
- # zero data.
- #
- # ==== Arguments
- # * <tt>path</tt>: An absolute znode path to create
- #
- # ==== Examples
- #
- # zk.exists?('/path')
- # # => false
- #
- # zk.mkdir_p('/path/to/blah')
- # # => "/path/to/blah"
- #
- #--
- # TODO: write a non-recursive version of this. ruby doesn't have TCO, so
- # this could get expensive w/ psychotically long paths
- def mkdir_p(path)
- create(path, '', :mode => :persistent)
- rescue Exceptions::NodeExists
- return
- rescue Exceptions::NoNode
- if File.dirname(path) == '/'
- # ok, we're screwed, blow up
- raise KeeperException, "could not create '/', something is wrong", caller
- end
-
- mkdir_p(File.dirname(path))
- retry
- end
-
- # recursively remove all children of path then remove path itself
- def rm_rf(paths)
- Array(paths).flatten.each do |path|
- begin
- children(path).each do |child|
- rm_rf(File.join(path, child))
- end
-
- delete(path)
- nil
- rescue Exceptions::NoNode
- end
- end
- end
-
- # see ZK::Find for explanation
- def find(*paths, &block)
- ZK::Find.find(self, *paths, &block)
- end
-
- # will block the caller until +abs_node_path+ has been removed
- #
- # NOTE: this is dangerous to use in callbacks! there is only one
- # event-delivery thread, so if you use this method in a callback or
- # watcher, you *will* deadlock!
- def block_until_node_deleted(abs_node_path)
- queue = Queue.new
- ev_sub = nil
-
- node_deletion_cb = lambda do |event|
- if event.node_deleted?
- queue.enq(:deleted)
- else
- queue.enq(:deleted) unless exists?(abs_node_path, :watch => true)
- end
- end
-
- ev_sub = watcher.register(abs_node_path, &node_deletion_cb)
-
- # set up the callback, but bail if we don't need to wait
- return true unless exists?(abs_node_path, :watch => true)
-
- queue.pop # block waiting for node deletion
- true
- ensure
- # be sure we clean up after ourselves
- ev_sub.unregister if ev_sub
- end
-
- # creates a new locker based on the name you send in
- #
- # see ZK::Locker::ExclusiveLocker
- #
- # returns a ZK::Locker::ExclusiveLocker instance using this Client and provided
- # lock name
- #
- # ==== Arguments
- # * <tt>name</tt> name of the lock you wish to use
- #
- # ==== Examples
- #
- # zk.locker("blah")
- # # => #<ZK::Locker::ExclusiveLocker:0x102034cf8 ...>
- #
- def locker(name)
- Locker.exclusive_locker(self, name)
- end
-
- # create a new shared locking instance based on the name given
- #
- # returns a ZK::Locker::SharedLocker instance using this Client and provided
- # lock name
- #
- # ==== Arguments
- # * <tt>name</tt> name of the lock you wish to use
- #
- # ==== Examples
- #
- # zk.shared_locker("blah")
- # # => #<ZK::Locker::SharedLocker:0x102034cf8 ...>
- #
- def shared_locker(name)
- Locker.shared_locker(self, name)
- end
-
- # Convenience method for acquiring a lock then executing a code block. This
- # will block the caller until the lock is acquired.
- #
- # ==== Arguments
- # * <tt>name</tt>: the name of the lock to use
- # * <tt>:mode</tt>: either :shared or :exclusive, defaults to :exclusive
- #
- # ==== Examples
- #
- # zk.with_lock('foo') do
- # # this code is executed while holding the lock
- # end
- #
- def with_lock(name, opts={}, &b)
- mode = opts[:mode] || :exclusive
-
- raise ArgumentError, ":mode option must be either :shared or :exclusive, not #{mode.inspect}" unless [:shared, :exclusive].include?(mode)
-
- if mode == :shared
- shared_locker(name).with_lock(&b)
- else
- locker(name).with_lock(&b)
- end
- end
-
- # Convenience method for constructing a ZK::Election::Candidate object using this
- # Client connection, the given election +name+ and +data+.
- #
- def election_candidate(name, data, opts={})
- opts = opts.merge(:data => data)
- ZK::Election::Candidate.new(self, name, opts)
- end
-
- # Convenience method for constructing a ZK::Election::Observer object using this
- # Client connection, and the given election +name+.
- #
- def election_observer(name, opts={})
- ZK::Election::Observer.new(self, name, opts)
- end
-
- # creates a new message queue of name +name+
- #
- # returns a ZK::MessageQueue object
- #
- # ==== Arguments
- # * <tt>name</tt> the name of the queue
- #
- # ==== Examples
- #
- # zk.queue("blah").publish({:some_data => "that is yaml serializable"})
- #
- def queue(name)
- MessageQueue.new(self, name)
- end
-
- def set_debug_level(level) #:nodoc:
- if defined?(::JRUBY_VERSION)
- warn "set_debug_level is not implemented for JRuby"
- return
- else
- num =
- case level
- when String, Symbol
- ZookeeperBase.const_get(:"ZOO_LOG_LEVEL_#{level.to_s.upcase}") rescue NameError
- when Integer
- level
- end
-
- raise ArgumentError, "#{level.inspect} is not a valid argument to set_debug_level" unless num
-
- @cnx.set_debug_level(num)
- end
- end
-
- # Register a block to be called on connection, when the client has
- # connected. The block will *always* be called asynchronously (on a
- # background thread).
- #
- # the block will be called with no arguments
- #
- # returns an EventHandlerSubscription object that can be used to unregister
- # this block from further updates
- #
- def on_connected(&block)
- watcher.register_state_handler(:connected, &block).tap do
- defer { block.call } if connected?
- end
- end
-
- # register a block to be called when the client is attempting to reconnect
- # to the zookeeper server. the documentation says that this state should be
- # taken to mean that the application should enter into "safe mode" and operate
- # conservatively, as it won't be getting updates until it has reconnected
- #
- def on_connecting(&block)
- watcher.register_state_handler(:connecting, &block).tap do
- defer { block.call } if connecting?
- end
- end
-
- # register a block to be called when our session has expired. This usually happens
- # due to a network partitioning event, and means that all callbacks and watches must
- # be re-registered with the server
- #---
- # NOTE: need to come up with a way to test this
- def on_expired_session(&block)
- watcher.register_state_handler(:expired_session, &block).tap do
- defer { block.call } if expired_session?
- end
- end
-
- protected
- def wrap_state_closed_error
- yield
- rescue RuntimeError => e
- # gah, lame error parsing here
- raise e unless e.message == 'zookeeper handle is closed'
- false
- end
-
- def check_rc(hash, inputs=nil)
- hash.tap do |h|
- if code = h[:rc]
- msg = inputs ? "inputs: #{inputs.inspect}" : nil
- raise Exceptions::KeeperException.by_code(code), msg unless code == Zookeeper::ZOK
- end
- end
- end
-
- def setup_watcher!(watch_type, opts)
- @event_handler.setup_watcher!(watch_type, opts)
- end
end
end
+
+require 'z_k/client/state_mixin'
+require 'z_k/client/unixisms'
+require 'z_k/client/conveniences'
+require 'z_k/client/base'