lib/zk/client/threaded.rb in zk-1.5.3 vs lib/zk/client/threaded.rb in zk-1.6.0
- old
+ new
@@ -345,11 +345,11 @@
end
on_tpool ? shutdown_thread : shutdown_thread.join(30)
end
- # {see Base#close}
+ # {see ZK::Client::Base#close}
def close
super
subs, @fork_subs = @fork_subs, []
subs.each(&:unsubscribe)
nil
@@ -363,10 +363,37 @@
# (see Threadpool#on_exception)
def on_exception(&blk)
@threadpool.on_exception(&blk)
end
+ def closed?
+ return true if @mutex.synchronize { @client_state == CLOSED }
+ super
+ end
+
+ # this is where the :on option is implemented for {Base#create}
+ def create(path, *args)
+ opts = args.extract_options!
+
+ or_opt = opts.delete(:or)
+ args << opts
+
+ if or_opt
+ hash = parse_create_args(path, *args)
+
+ raise ArgumentError, "valid options for :or are nil or :set, not #{or_opt.inspect}" unless or_opt == :set
+ raise ArgumentError, "you cannot create an ephemeral node when using the :or option" if hash[:ephemeral]
+ raise ArgumentError, "you cannot create an sequence node when using the :or option" if hash[:sequence]
+
+ mkdir_p(path, :data => hash[:data])
+ path
+ else
+ # ok, none of our business, hand it up to mangement
+ super(path, *args)
+ end
+ end
+
# @private
def raw_event_handler(event)
return unless event.session_event?
@mutex.synchronize do
@@ -376,47 +403,51 @@
end
rescue Exception => e
logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" }
end
- def closed?
- return true if @mutex.synchronize { @client_state == CLOSED }
- super
- end
-
- # are we in running (not-paused) state?
# @private
- def running?
- @mutex.synchronize { @client_state == RUNNING }
- end
-
- # are we in paused state?
- # @private
- def paused?
- @mutex.synchronize { @client_state == PAUSED }
- end
-
- # has shutdown time arrived?
- # @private
- def close_requested?
- @mutex.synchronize { @client_state == CLOSE_REQ }
- end
-
- # @private
def wait_until_connected_or_dying(timeout)
time_to_stop = Time.now + timeout
@mutex.synchronize do
- while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@client_state == RUNNING)
- @cond.wait(timeout)
+ while true
+ now = Time.now
+ break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (now > time_to_stop) || (@client_state != RUNNING)
+ deadline = time_to_stop.to_f - now.to_f
+ @cond.wait(deadline)
end
- logger.debug { "@last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" }
+ logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" }
end
end
+ # @private
+ def client_state
+ @mutex.synchronize { @client_state }
+ end
+
private
+ # are we in running (not-paused) state?
+ def running?
+ @client_state == RUNNING
+ end
+
+ # are we in paused state?
+ def paused?
+ @client_state == PAUSED
+ end
+
+ # has shutdown time arrived?
+ def close_requested?
+ @client_state == CLOSE_REQ
+ end
+
+ def dead_or_dying?
+ (@client_state == CLOSE_REQ) || (@client_state == CLOSED)
+ end
+
# this is just here so we can see it in stack traces
def reopen_after_session_expired
reopen
end
@@ -431,15 +462,15 @@
Thread.current[:name] = 'reconnect'
while @reconnect # too clever?
@mutex.synchronize do
# either we havne't seen a valid session update from this
# connection yet, or we're doing fine, so just wait
- @cond.wait_while { !seen_session_state_event? or (valid_session_state? and (@client_state == RUNNING)) }
+ @cond.wait_while { !seen_session_state_event? or (valid_session_state? and running?) }
# we've entered into a non-running state, so we exit
# note: need to restart this thread after a fork in parent
- if @client_state != RUNNING
+ unless running?
logger.debug { "session failure watcher thread exiting, @client_state: #{@client_state}" }
return
end
# if we know that this session was valid once and it has now
@@ -455,12 +486,10 @@
logger.debug { "reopen returned: #{rv.inspect}" }
end
end
end
- ensure
- logger.debug { "reconnect thread exiting" }
end
def join_and_clear_reconnect_thread
return unless @reconnect_thread
begin
@@ -486,11 +515,11 @@
rescue Exceptions::Retryable => e
time_to_stop = Time.now + retry_duration
wait_until_connected_or_dying(retry_duration)
- if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@client_state != RUNNING)
+ if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || !running?
raise e
else
retry
end
end
@@ -515,13 +544,9 @@
@last_cnx_state and (@last_cnx_state >= 0)
end
def create_connection(*args)
::Zookeeper.new(*args)
- end
-
- def dead_or_dying?
- (@client_state == CLOSE_REQ) || (@client_state == CLOSED)
end
def unlocked_connect(opts={})
return if @cnx
timeout = opts.fetch(:timeout, @connection_timeout)