lib/zookeeper.rb in zookeeper-0.4.3 vs lib/zookeeper.rb in zookeeper-0.4.4

- old
+ new

@@ -20,11 +20,11 @@ # debug levels ZOO_LOG_LEVEL_ERROR = 1 ZOO_LOG_LEVEL_WARN = 2 ZOO_LOG_LEVEL_INFO = 3 ZOO_LOG_LEVEL_DEBUG = 4 - + def reopen(timeout = 10) init(@host) if timeout > 0 time_to_stop = Time.now + timeout until state == Zookeeper::ZOO_CONNECTED_STATE @@ -32,36 +32,36 @@ sleep 0.1 end end # flushes all outstanding watcher reqs. @watcher_reqs = { ZKRB_GLOBAL_CB_REQ => { :watcher => get_default_global_watcher } } + setup_dispatch_thread! state end def initialize(host, timeout = 10) @watcher_reqs = {} @completion_reqs = {} @req_mutex = Mutex.new @current_req_id = 1 @host = host return nil if reopen(timeout) != Zookeeper::ZOO_CONNECTED_STATE - setup_dispatch_thread! end public def get(options = {}) assert_open assert_supported_keys(options, [:path, :watcher, :watcher_context, :callback, :callback_context]) assert_required_keys(options, [:path]) - + req_id = setup_call(options) rc, value, stat = super(req_id, options[:path], options[:callback], options[:watcher]) rv = { :req_id => req_id, :rc => rc } options[:callback] ? rv : rv.merge(:data => value, :stat => Stat.new(stat)) end - + def set(options = {}) assert_open assert_supported_keys(options, [:path, :data, :version, :callback, :callback_context]) assert_required_keys(options, [:path]) options[:version] ||= -1 @@ -70,11 +70,11 @@ rc, stat = super(req_id, options[:path], options[:data], options[:callback], options[:version]) rv = { :req_id => req_id, :rc => rc } options[:callback] ? rv : rv.merge(:stat => Stat.new(stat)) end - + def get_children(options = {}) assert_open assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context]) assert_required_keys(options, [:path]) @@ -94,120 +94,129 @@ rc, stat = exists(req_id, options[:path], options[:callback], options[:watcher]) rv = { :req_id => req_id, :rc => rc } options[:callback] ? rv : rv.merge(:stat => Stat.new(stat)) end - + def create(options = {}) assert_open assert_supported_keys(options, [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context]) assert_required_keys(options, [:path]) - + flags = 0 flags |= ZOO_EPHEMERAL if options[:ephemeral] flags |= ZOO_SEQUENCE if options[:sequence] options[:acl] ||= ZOO_OPEN_ACL_UNSAFE - + req_id = setup_call(options) rc, newpath = super(req_id, options[:path], options[:data], options[:callback], options[:acl], flags) - + rv = { :req_id => req_id, :rc => rc } - options[:callback] ? rv : rv.merge(:path => newpath) + options[:callback] ? rv : rv.merge(:path => newpath) end - + def delete(options = {}) assert_open assert_supported_keys(options, [:path, :version, :callback, :callback_context]) assert_required_keys(options, [:path]) options[:version] ||= -1 - + req_id = setup_call(options) rc = super(req_id, options[:path], options[:version], options[:callback]) - + { :req_id => req_id, :rc => rc } end def set_acl(options = {}) assert_open assert_supported_keys(options, [:path, :acl, :version, :callback, :callback_context]) assert_required_keys(options, [:path, :acl]) options[:version] ||= -1 - + req_id = setup_call(options) rc = super(req_id, options[:path], options[:acl], options[:callback], options[:version]) - + { :req_id => req_id, :rc => rc } end - + def get_acl(options = {}) assert_open assert_supported_keys(options, [:path, :callback, :callback_context]) assert_required_keys(options, [:path]) - + req_id = setup_call(options) rc, acls, stat = super(req_id, options[:path], options[:callback]) - + rv = { :req_id => req_id, :rc => rc } options[:callback] ? rv : rv.merge(:acl => acls, :stat => Stat.new(stat)) end + # To close a Zk handle, first shutdown the dispatcher thread; this is done by + # signalling the waiting thread that there is a pending close. We then release + # the C-land Zk state. + def close + signal_pending_close + @dispatcher.join + super + end + private def setup_dispatch_thread! @dispatcher = Thread.new { while true do - dispatch_next_callback + hash = get_next_event + break if hash.nil? # Pending close => exit dispatcher thread + dispatch_event(hash) end } end - def dispatch_next_callback - hash = get_next_event - + def dispatch_event(hash) is_completion = hash.has_key?(:rc) - + hash[:stat] = Stat.new(hash[:stat]) if hash.has_key?(:stat) hash[:acl] = hash[:acl].map { |acl| ACL.new(acl) } if hash[:acl] - + callback_context = is_completion ? get_completion(hash[:req_id]) : get_watcher(hash[:req_id]) callback = is_completion ? callback_context[:callback] : callback_context[:watcher] hash[:context] = callback_context[:context] - + # TODO: Eventually enforce derivation from Zookeeper::Callback if callback.respond_to?(:call) callback.call(hash) else # puts "dispatch_next_callback found non-callback => #{callback.inspect}" end end - + def setup_call(opts) req_id = nil @req_mutex.synchronize { req_id = @current_req_id @current_req_id += 1 setup_completion(req_id, opts) if opts[:callback] setup_watcher(req_id, opts) if opts[:watcher] } req_id end - + def setup_watcher(req_id, call_opts) @watcher_reqs[req_id] = { :watcher => call_opts[:watcher], :context => call_opts[:watcher_context] } end def setup_completion(req_id, call_opts) @completion_reqs[req_id] = { :callback => call_opts[:callback], :context => call_opts[:callback_context] } end - + def get_watcher(req_id) @req_mutex.synchronize { req_id != ZKRB_GLOBAL_CB_REQ ? @watcher_reqs.delete(req_id) : @watcher_reqs[req_id] } end - + def get_completion(req_id) @req_mutex.synchronize { @completion_reqs.delete(req_id) } end public