lib/zookeeper.rb in zookeeper-0.4.3 vs lib/zookeeper.rb in zookeeper-0.4.4
- old
+ new
@@ -20,11 +20,11 @@
# debug levels
ZOO_LOG_LEVEL_ERROR = 1
ZOO_LOG_LEVEL_WARN = 2
ZOO_LOG_LEVEL_INFO = 3
ZOO_LOG_LEVEL_DEBUG = 4
-
+
def reopen(timeout = 10)
init(@host)
if timeout > 0
time_to_stop = Time.now + timeout
until state == Zookeeper::ZOO_CONNECTED_STATE
@@ -32,36 +32,36 @@
sleep 0.1
end
end
# flushes all outstanding watcher reqs.
@watcher_reqs = { ZKRB_GLOBAL_CB_REQ => { :watcher => get_default_global_watcher } }
+ setup_dispatch_thread!
state
end
def initialize(host, timeout = 10)
@watcher_reqs = {}
@completion_reqs = {}
@req_mutex = Mutex.new
@current_req_id = 1
@host = host
return nil if reopen(timeout) != Zookeeper::ZOO_CONNECTED_STATE
- setup_dispatch_thread!
end
public
def get(options = {})
assert_open
assert_supported_keys(options, [:path, :watcher, :watcher_context, :callback, :callback_context])
assert_required_keys(options, [:path])
-
+
req_id = setup_call(options)
rc, value, stat = super(req_id, options[:path], options[:callback], options[:watcher])
rv = { :req_id => req_id, :rc => rc }
options[:callback] ? rv : rv.merge(:data => value, :stat => Stat.new(stat))
end
-
+
def set(options = {})
assert_open
assert_supported_keys(options, [:path, :data, :version, :callback, :callback_context])
assert_required_keys(options, [:path])
options[:version] ||= -1
@@ -70,11 +70,11 @@
rc, stat = super(req_id, options[:path], options[:data], options[:callback], options[:version])
rv = { :req_id => req_id, :rc => rc }
options[:callback] ? rv : rv.merge(:stat => Stat.new(stat))
end
-
+
def get_children(options = {})
assert_open
assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
assert_required_keys(options, [:path])
@@ -94,120 +94,129 @@
rc, stat = exists(req_id, options[:path], options[:callback], options[:watcher])
rv = { :req_id => req_id, :rc => rc }
options[:callback] ? rv : rv.merge(:stat => Stat.new(stat))
end
-
+
def create(options = {})
assert_open
assert_supported_keys(options, [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context])
assert_required_keys(options, [:path])
-
+
flags = 0
flags |= ZOO_EPHEMERAL if options[:ephemeral]
flags |= ZOO_SEQUENCE if options[:sequence]
options[:acl] ||= ZOO_OPEN_ACL_UNSAFE
-
+
req_id = setup_call(options)
rc, newpath = super(req_id, options[:path], options[:data], options[:callback], options[:acl], flags)
-
+
rv = { :req_id => req_id, :rc => rc }
- options[:callback] ? rv : rv.merge(:path => newpath)
+ options[:callback] ? rv : rv.merge(:path => newpath)
end
-
+
def delete(options = {})
assert_open
assert_supported_keys(options, [:path, :version, :callback, :callback_context])
assert_required_keys(options, [:path])
options[:version] ||= -1
-
+
req_id = setup_call(options)
rc = super(req_id, options[:path], options[:version], options[:callback])
-
+
{ :req_id => req_id, :rc => rc }
end
def set_acl(options = {})
assert_open
assert_supported_keys(options, [:path, :acl, :version, :callback, :callback_context])
assert_required_keys(options, [:path, :acl])
options[:version] ||= -1
-
+
req_id = setup_call(options)
rc = super(req_id, options[:path], options[:acl], options[:callback], options[:version])
-
+
{ :req_id => req_id, :rc => rc }
end
-
+
def get_acl(options = {})
assert_open
assert_supported_keys(options, [:path, :callback, :callback_context])
assert_required_keys(options, [:path])
-
+
req_id = setup_call(options)
rc, acls, stat = super(req_id, options[:path], options[:callback])
-
+
rv = { :req_id => req_id, :rc => rc }
options[:callback] ? rv : rv.merge(:acl => acls, :stat => Stat.new(stat))
end
+ # To close a Zk handle, first shutdown the dispatcher thread; this is done by
+ # signalling the waiting thread that there is a pending close. We then release
+ # the C-land Zk state.
+ def close
+ signal_pending_close
+ @dispatcher.join
+ super
+ end
+
private
def setup_dispatch_thread!
@dispatcher = Thread.new {
while true do
- dispatch_next_callback
+ hash = get_next_event
+ break if hash.nil? # Pending close => exit dispatcher thread
+ dispatch_event(hash)
end
}
end
- def dispatch_next_callback
- hash = get_next_event
-
+ def dispatch_event(hash)
is_completion = hash.has_key?(:rc)
-
+
hash[:stat] = Stat.new(hash[:stat]) if hash.has_key?(:stat)
hash[:acl] = hash[:acl].map { |acl| ACL.new(acl) } if hash[:acl]
-
+
callback_context = is_completion ? get_completion(hash[:req_id]) : get_watcher(hash[:req_id])
callback = is_completion ? callback_context[:callback] : callback_context[:watcher]
hash[:context] = callback_context[:context]
-
+
# TODO: Eventually enforce derivation from Zookeeper::Callback
if callback.respond_to?(:call)
callback.call(hash)
else
# puts "dispatch_next_callback found non-callback => #{callback.inspect}"
end
end
-
+
def setup_call(opts)
req_id = nil
@req_mutex.synchronize {
req_id = @current_req_id
@current_req_id += 1
setup_completion(req_id, opts) if opts[:callback]
setup_watcher(req_id, opts) if opts[:watcher]
}
req_id
end
-
+
def setup_watcher(req_id, call_opts)
@watcher_reqs[req_id] = { :watcher => call_opts[:watcher],
:context => call_opts[:watcher_context] }
end
def setup_completion(req_id, call_opts)
@completion_reqs[req_id] = { :callback => call_opts[:callback],
:context => call_opts[:callback_context] }
end
-
+
def get_watcher(req_id)
@req_mutex.synchronize {
req_id != ZKRB_GLOBAL_CB_REQ ? @watcher_reqs.delete(req_id) : @watcher_reqs[req_id]
}
end
-
+
def get_completion(req_id)
@req_mutex.synchronize { @completion_reqs.delete(req_id) }
end
public