lib/zookeeper.rb in zookeeper-0.2.2 vs lib/zookeeper.rb in zookeeper-0.3.0

- old
+ new

@@ -1,78 +1,234 @@ # Ruby wrapper for the Zookeeper C API -# Phillip Pearson <pp@myelin.co.nz> require 'zookeeper_c' +require 'thread' +require 'zookeeper/callbacks' +require 'zookeeper/constants' +require 'zookeeper/exceptions' +require 'zookeeper/stat' +require 'zookeeper/acls' -class ZkStat - attr_reader :version - def initialize(ary) - @czxid, @mzxid, @ctime, @mtime, @version, @cversion, @aversion, @ephemeralOwner = ary - end -end - class Zookeeper < CZookeeper - def initialize(host) + include ZookeeperCallbacks + include ZookeeperConstants + include ZookeeperExceptions + include ZookeeperACLs + include ZookeeperStat + + ZKRB_GLOBAL_CB_REQ = -1 + + # debug levels + ZOO_LOG_LEVEL_ERROR = 1 + ZOO_LOG_LEVEL_WARN = 2 + ZOO_LOG_LEVEL_INFO = 3 + ZOO_LOG_LEVEL_DEBUG = 4 + + def initialize(host, timeout = 10) + @watcher_reqs = { ZKRB_GLOBAL_CB_REQ => { :watcher => get_default_global_watcher } } + @completion_reqs = {} + @req_mutex = Mutex.new + @current_req_id = 1 super(host) - @watchers = {} # path => [ block, block, ... ] + + if timeout > 0 + time_to_stop = Time.now + timeout + until state == Zookeeper::ZOO_CONNECTED_STATE + break if Time.now > time_to_stop + sleep 0.1 + end + + return nil if state != Zookeeper::ZOO_CONNECTED_STATE + end + + setup_dispatch_thread! end + +public + def get(options = {}) + assert_supported_keys(options, [:path, :watcher, :watcher_context, :callback, :callback_context]) + assert_required_keys(options, [:path]) + + req_id = setup_call(options) + rc, value, stat = super(req_id, options[:path], options[:callback], options[:watcher]) - def exists(path, &blk) - (@watchers[path] ||= []) << blk if blk - ZkStat.new(super(path, !!blk)) + rv = { :req_id => req_id, :rc => rc } + options[:callback] ? rv : rv.merge(:data => value, :stat => Stat.new(stat)) end + + def set(options = {}) + assert_supported_keys(options, [:path, :data, :version, :callback, :callback_context]) + assert_required_keys(options, [:path]) + options[:version] ||= -1 - def stat(path, &blk) - exists(path, &blk) - rescue Zookeeper::NoNodeError - nil + req_id = setup_call(options) + rc, stat = super(req_id, options[:path], options[:data], options[:callback], options[:version]) + + rv = { :req_id => req_id, :rc => rc } + options[:callback] ? rv : rv.merge(:stat => Stat.new(stat)) end + + def get_children(options = {}) + assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context]) + assert_required_keys(options, [:path]) - def get(path) - value, stat = super - [value, ZkStat.new(stat)] + req_id = setup_call(options) + rc, children, stat = super(req_id, options[:path], options[:callback], options[:watcher]) + + rv = { :req_id => req_id, :rc => rc } + options[:callback] ? rv : rv.merge(:children => children, :stat => Stat.new(stat)) end - def try_acquire(path, value) - # create the parent node if it doesn't exist already - create(path, "lock node", 0) unless stat(path) + def stat(options = {}) + assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context]) + assert_required_keys(options, [:path]) - # attempt to obtain the lock - realpath = create("#{path}/lock-", value, Zookeeper::ZOO_EPHEMERAL | Zookeeper::ZOO_SEQUENCE) - #puts "created lock node #{realpath}" + req_id = setup_call(options) + rc, stat = exists(req_id, options[:path], options[:callback], options[:watcher]) - # see if we got it - serial = /lock-(\d+)$/.match(realpath).captures[0].to_i - have_lock = true - ls(path).each do |child| - if m = /lock-(\d+)$/.match(child) - if m.captures[0].to_i < serial - have_lock = false - break - end + rv = { :req_id => req_id, :rc => rc } + options[:callback] ? rv : rv.merge(:stat => Stat.new(stat)) + end + + def create(options = {}) + assert_supported_keys(options, [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context]) + assert_required_keys(options, [:path]) + + flags = 0 + flags |= ZOO_EPHEMERAL if options[:ephemeral] + flags |= ZOO_SEQUENCE if options[:sequence] + + options[:acl] ||= ZOO_OPEN_ACL_UNSAFE + + req_id = setup_call(options) + rc, newpath = super(req_id, options[:path], options[:data], options[:callback], options[:acl], flags) + + rv = { :req_id => req_id, :rc => rc } + options[:callback] ? rv : rv.merge(:path => newpath) + end + + def delete(options = {}) + assert_supported_keys(options, [:path, :version, :callback, :callback_context]) + assert_required_keys(options, [:path]) + options[:version] ||= -1 + + req_id = setup_call(options) + rc = super(req_id, options[:path], options[:version], options[:callback]) + + { :req_id => req_id, :rc => rc } + end + + def set_acl(options = {}) + assert_supported_keys(options, [:path, :acl, :version, :callback, :callback_context]) + assert_required_keys(options, [:path, :acl]) + options[:version] ||= -1 + + req_id = setup_call(options) + rc = super(req_id, options[:path], options[:acl], options[:callback], options[:version]) + + { :req_id => req_id, :rc => rc } + end + + def get_acl(options = {}) + assert_supported_keys(options, [:path, :callback, :callback_context]) + assert_required_keys(options, [:path]) + + req_id = setup_call(options) + rc, acls, stat = super(req_id, options[:path], options[:callback]) + + rv = { :req_id => req_id, :rc => rc } + options[:callback] ? rv : rv.merge(:acl => acls, :stat => Stat.new(stat)) + end + +private + def setup_dispatch_thread! + @dispatcher = Thread.new { + while true do + dispatch_next_callback + sleep 0.1 end - end + } + end - # call block - yield(have_lock) + def dispatch_next_callback + hash = get_next_event + return nil unless hash - # release the lock - #puts "deleting #{realpath}" - delete(realpath, stat(realpath).version) + is_completion = hash.has_key?(:rc) + + hash[:stat] = Stat.new(hash[:stat]) if hash.has_key?(:stat) + hash[:acl] = hash[:acl].map { |acl| ACL.new(acl) } if hash[:acl] + + callback_context = is_completion ? get_completion(hash[:req_id]) : get_watcher(hash[:req_id]) + callback = is_completion ? callback_context[:callback] : callback_context[:watcher] + hash[:context] = callback_context[:context] + + # TODO: Eventually enforce derivation from Zookeeper::Callback + if callback.respond_to?(:call) + callback.call(hash) + else + puts "dispatch_next_callback found non-callback => #{callback.inspect}" + end end + + def setup_call(opts) + req_id = nil + @req_mutex.synchronize { + req_id = @current_req_id + @current_req_id += 1 + setup_completion(req_id, opts) if opts[:callback] + setup_watcher(req_id, opts) if opts[:watcher] + } + req_id + end + + def setup_watcher(req_id, call_opts) + @watcher_reqs[req_id] = { :watcher => call_opts[:watcher], + :context => call_opts[:watcher_context] } + end - def watcher(type, state, path) - raise Exception("watchers don't work in ruby yet") # ... until I figure out how to synchronize access to the Ruby interpreter + def setup_completion(req_id, call_opts) + @completion_reqs[req_id] = { :callback => call_opts[:callback], + :context => call_opts[:callback_context] } + end + + def get_watcher(req_id) + @req_mutex.synchronize { + req_id != ZKRB_GLOBAL_CB_REQ ? @watcher_reqs.delete(req_id) : @watcher_reqs[req_id] + } + end + + def get_completion(req_id) + @req_mutex.synchronize { @completion_reqs.delete(req_id) } + end - return unless type == ZOO_SESSION_EVENT +public + # TODO: Sanitize user mistakes by unregistering watchers from ops that + # don't return ZOK (except wexists)? Make users clean up after themselves for now. + def unregister_watcher(req_id) + @req_mutex.synchronize { + @watcher_reqs.delete(req_id) + } + end - case state - when ZOO_CONNECTED_STATE - puts "ruby watcher; got an event for #{path}" +private + def get_default_global_watcher + Proc.new { |args| + puts "Ruby ZK Global CB called type=#{event_by_value(args[:type])} state=#{state_by_value(args[:state])}" + } + end - when ZOO_AUTH_FAILED_STATE - raise Exception, "auth failure" - when ZOO_EXPIRED_SESSION_STATE - raise Exception, "session expired" + def assert_supported_keys(args, supported) + unless (args.keys - supported).empty? + raise ZookeeperException::BadArguments, + "Supported arguments are: #{supported.inspect}, but arguments #{args.keys.inspect} were supplied instead" end end + + def assert_required_keys(args, required) + unless (required - args.keys).empty? + raise ZookeeperException::BadArguments, + "Required arguments are: #{required.inspect}, but only the arguments #{args.keys.inspect} were supplied." + end + end end +