lib/zookeeper.rb in zookeeper-0.2.2 vs lib/zookeeper.rb in zookeeper-0.3.0
- old
+ new
@@ -1,78 +1,234 @@
# Ruby wrapper for the Zookeeper C API
-# Phillip Pearson <pp@myelin.co.nz>
require 'zookeeper_c'
+require 'thread'
+require 'zookeeper/callbacks'
+require 'zookeeper/constants'
+require 'zookeeper/exceptions'
+require 'zookeeper/stat'
+require 'zookeeper/acls'
-class ZkStat
- attr_reader :version
- def initialize(ary)
- @czxid, @mzxid, @ctime, @mtime, @version, @cversion, @aversion, @ephemeralOwner = ary
- end
-end
-
class Zookeeper < CZookeeper
- def initialize(host)
+ include ZookeeperCallbacks
+ include ZookeeperConstants
+ include ZookeeperExceptions
+ include ZookeeperACLs
+ include ZookeeperStat
+
+ ZKRB_GLOBAL_CB_REQ = -1
+
+ # debug levels
+ ZOO_LOG_LEVEL_ERROR = 1
+ ZOO_LOG_LEVEL_WARN = 2
+ ZOO_LOG_LEVEL_INFO = 3
+ ZOO_LOG_LEVEL_DEBUG = 4
+
+ def initialize(host, timeout = 10)
+ @watcher_reqs = { ZKRB_GLOBAL_CB_REQ => { :watcher => get_default_global_watcher } }
+ @completion_reqs = {}
+ @req_mutex = Mutex.new
+ @current_req_id = 1
super(host)
- @watchers = {} # path => [ block, block, ... ]
+
+ if timeout > 0
+ time_to_stop = Time.now + timeout
+ until state == Zookeeper::ZOO_CONNECTED_STATE
+ break if Time.now > time_to_stop
+ sleep 0.1
+ end
+
+ return nil if state != Zookeeper::ZOO_CONNECTED_STATE
+ end
+
+ setup_dispatch_thread!
end
+
+public
+ def get(options = {})
+ assert_supported_keys(options, [:path, :watcher, :watcher_context, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+
+ req_id = setup_call(options)
+ rc, value, stat = super(req_id, options[:path], options[:callback], options[:watcher])
- def exists(path, &blk)
- (@watchers[path] ||= []) << blk if blk
- ZkStat.new(super(path, !!blk))
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:data => value, :stat => Stat.new(stat))
end
+
+ def set(options = {})
+ assert_supported_keys(options, [:path, :data, :version, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+ options[:version] ||= -1
- def stat(path, &blk)
- exists(path, &blk)
- rescue Zookeeper::NoNodeError
- nil
+ req_id = setup_call(options)
+ rc, stat = super(req_id, options[:path], options[:data], options[:callback], options[:version])
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:stat => Stat.new(stat))
end
+
+ def get_children(options = {})
+ assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
+ assert_required_keys(options, [:path])
- def get(path)
- value, stat = super
- [value, ZkStat.new(stat)]
+ req_id = setup_call(options)
+ rc, children, stat = super(req_id, options[:path], options[:callback], options[:watcher])
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:children => children, :stat => Stat.new(stat))
end
- def try_acquire(path, value)
- # create the parent node if it doesn't exist already
- create(path, "lock node", 0) unless stat(path)
+ def stat(options = {})
+ assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
+ assert_required_keys(options, [:path])
- # attempt to obtain the lock
- realpath = create("#{path}/lock-", value, Zookeeper::ZOO_EPHEMERAL | Zookeeper::ZOO_SEQUENCE)
- #puts "created lock node #{realpath}"
+ req_id = setup_call(options)
+ rc, stat = exists(req_id, options[:path], options[:callback], options[:watcher])
- # see if we got it
- serial = /lock-(\d+)$/.match(realpath).captures[0].to_i
- have_lock = true
- ls(path).each do |child|
- if m = /lock-(\d+)$/.match(child)
- if m.captures[0].to_i < serial
- have_lock = false
- break
- end
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:stat => Stat.new(stat))
+ end
+
+ def create(options = {})
+ assert_supported_keys(options, [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+
+ flags = 0
+ flags |= ZOO_EPHEMERAL if options[:ephemeral]
+ flags |= ZOO_SEQUENCE if options[:sequence]
+
+ options[:acl] ||= ZOO_OPEN_ACL_UNSAFE
+
+ req_id = setup_call(options)
+ rc, newpath = super(req_id, options[:path], options[:data], options[:callback], options[:acl], flags)
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:path => newpath)
+ end
+
+ def delete(options = {})
+ assert_supported_keys(options, [:path, :version, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+ options[:version] ||= -1
+
+ req_id = setup_call(options)
+ rc = super(req_id, options[:path], options[:version], options[:callback])
+
+ { :req_id => req_id, :rc => rc }
+ end
+
+ def set_acl(options = {})
+ assert_supported_keys(options, [:path, :acl, :version, :callback, :callback_context])
+ assert_required_keys(options, [:path, :acl])
+ options[:version] ||= -1
+
+ req_id = setup_call(options)
+ rc = super(req_id, options[:path], options[:acl], options[:callback], options[:version])
+
+ { :req_id => req_id, :rc => rc }
+ end
+
+ def get_acl(options = {})
+ assert_supported_keys(options, [:path, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+
+ req_id = setup_call(options)
+ rc, acls, stat = super(req_id, options[:path], options[:callback])
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:acl => acls, :stat => Stat.new(stat))
+ end
+
+private
+ def setup_dispatch_thread!
+ @dispatcher = Thread.new {
+ while true do
+ dispatch_next_callback
+ sleep 0.1
end
- end
+ }
+ end
- # call block
- yield(have_lock)
+ def dispatch_next_callback
+ hash = get_next_event
+ return nil unless hash
- # release the lock
- #puts "deleting #{realpath}"
- delete(realpath, stat(realpath).version)
+ is_completion = hash.has_key?(:rc)
+
+ hash[:stat] = Stat.new(hash[:stat]) if hash.has_key?(:stat)
+ hash[:acl] = hash[:acl].map { |acl| ACL.new(acl) } if hash[:acl]
+
+ callback_context = is_completion ? get_completion(hash[:req_id]) : get_watcher(hash[:req_id])
+ callback = is_completion ? callback_context[:callback] : callback_context[:watcher]
+ hash[:context] = callback_context[:context]
+
+ # TODO: Eventually enforce derivation from Zookeeper::Callback
+ if callback.respond_to?(:call)
+ callback.call(hash)
+ else
+ puts "dispatch_next_callback found non-callback => #{callback.inspect}"
+ end
end
+
+ def setup_call(opts)
+ req_id = nil
+ @req_mutex.synchronize {
+ req_id = @current_req_id
+ @current_req_id += 1
+ setup_completion(req_id, opts) if opts[:callback]
+ setup_watcher(req_id, opts) if opts[:watcher]
+ }
+ req_id
+ end
+
+ def setup_watcher(req_id, call_opts)
+ @watcher_reqs[req_id] = { :watcher => call_opts[:watcher],
+ :context => call_opts[:watcher_context] }
+ end
- def watcher(type, state, path)
- raise Exception("watchers don't work in ruby yet") # ... until I figure out how to synchronize access to the Ruby interpreter
+ def setup_completion(req_id, call_opts)
+ @completion_reqs[req_id] = { :callback => call_opts[:callback],
+ :context => call_opts[:callback_context] }
+ end
+
+ def get_watcher(req_id)
+ @req_mutex.synchronize {
+ req_id != ZKRB_GLOBAL_CB_REQ ? @watcher_reqs.delete(req_id) : @watcher_reqs[req_id]
+ }
+ end
+
+ def get_completion(req_id)
+ @req_mutex.synchronize { @completion_reqs.delete(req_id) }
+ end
- return unless type == ZOO_SESSION_EVENT
+public
+ # TODO: Sanitize user mistakes by unregistering watchers from ops that
+ # don't return ZOK (except wexists)? Make users clean up after themselves for now.
+ def unregister_watcher(req_id)
+ @req_mutex.synchronize {
+ @watcher_reqs.delete(req_id)
+ }
+ end
- case state
- when ZOO_CONNECTED_STATE
- puts "ruby watcher; got an event for #{path}"
+private
+ def get_default_global_watcher
+ Proc.new { |args|
+ puts "Ruby ZK Global CB called type=#{event_by_value(args[:type])} state=#{state_by_value(args[:state])}"
+ }
+ end
- when ZOO_AUTH_FAILED_STATE
- raise Exception, "auth failure"
- when ZOO_EXPIRED_SESSION_STATE
- raise Exception, "session expired"
+ def assert_supported_keys(args, supported)
+ unless (args.keys - supported).empty?
+ raise ZookeeperException::BadArguments,
+ "Supported arguments are: #{supported.inspect}, but arguments #{args.keys.inspect} were supplied instead"
end
end
+
+ def assert_required_keys(args, required)
+ unless (required - args.keys).empty?
+ raise ZookeeperException::BadArguments,
+ "Required arguments are: #{required.inspect}, but only the arguments #{args.keys.inspect} were supplied."
+ end
+ end
end
+