lib/zk/client/threaded.rb in zk-1.6.1 vs lib/zk/client/threaded.rb in zk-1.6.2

- old
+ new

@@ -198,11 +198,11 @@ end # @option opts [Fixnum] :timeout how long we will wait for the connection # to be established. If timeout is nil, we will wait forever: *use # carefully*. - def connect(opts={}) + def connect(opts={}) @mutex.synchronize { unlocked_connect(opts) } end # (see Base#reopen) def reopen(timeout=nil) @@ -214,20 +214,18 @@ logger.debug { "reopening everything, fork detected!" } setup_locks - @pid = Process.pid + @pid = Process.pid @client_state = RUNNING # reset state to running if we were paused old_cnx, @cnx = @cnx, nil old_cnx.close! if old_cnx # && !old_cnx.closed? join_and_clear_reconnect_thread - @last_cnx_state = nil - @mutex.synchronize do # it's important that we're holding the lock, as access to 'cnx' is # synchronized, and we want to avoid a race where event handlers # might see a nil connection. I've seen this exception occur *once* # so it's pretty rare (it was on 1.8.7 too), but just to be double @@ -243,12 +241,18 @@ if @client_state == PAUSED # XXX: what to do in this case? does it matter? end logger.debug { "reopening, no fork detected" } - @last_cnx_state = nil + @last_cnx_state = Zookeeper::ZOO_CONNECTING_STATE @cnx.reopen(timeout) # ok, we werent' forked, so just reopen + + # this is a bit of a hack, because we need to wait until the event thread + # delivers the connected event, which we used to be able to rely on just the + # connection doing. since we don't want to call the @cnx.state method to check + # (rather use the cached @last_cnx_state), we wait for consistency's sake + wait_until_connected_or_dying(timeout) end end state end @@ -345,10 +349,41 @@ end on_tpool ? shutdown_thread : shutdown_thread.join(30) end + # this overrides the implementation in StateMixin + def connected? + @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE } + end + + def associating? + @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_ASSOCIATING_STATE } + end + + def connecting? + @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTING_STATE } + end + + def expired_session? + @mutex.synchronize do + return false unless @cnx and running? + + if defined?(::JRUBY_VERSION) + !@cnx.state.alive? + else + @last_cnx_state == Zookeeper::ZOO_EXPIRED_SESSION_STATE + end + end + end + + def state + @mutex.synchronize do + STATE_SYM_MAP.fetch(@last_cnx_state) { |k| raise IndexError, "unrecognized state: #{k.inspect}" } + end + end + # {see ZK::Client::Base#close} def close super subs, @fork_subs = @fork_subs, [] subs.each(&:unsubscribe) @@ -405,24 +440,51 @@ logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" } end # @private def wait_until_connected_or_dying(timeout) - time_to_stop = Time.now + timeout + time_to_stop = timeout ? Time.now + timeout : nil @mutex.synchronize do while true - now = Time.now - break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (now > time_to_stop) || (@client_state != RUNNING) - deadline = time_to_stop.to_f - now.to_f - @cond.wait(deadline) + if timeout + now = Time.now + break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (now > time_to_stop) || (@client_state != RUNNING) + deadline = time_to_stop.to_f - now.to_f + @cond.wait(deadline) + else + break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (@client_state != RUNNING) + @cond.wait + end end + end - logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" } + logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{timeout ? (Time.now.to_f < time_to_stop.to_f) : 'true'}, @client_state: #{@client_state.inspect}" } + end + + # @private + def wait_until_closed(timeout=nil) + time_to_stop = timeout ? Time.now + timeout : nil + + @mutex.synchronize do + while true + if timeout + now = Time.now + break if (now > time_to_stop) || (@client_state == CLOSED) + deadline = time_to_stop.to_f - now.to_f + @cond.wait(deadline) + else + break if @client_state == CLOSED + @cond.wait + end + end end + + logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{timeout ? (Time.now.to_f < time_to_stop.to_f) : 'true'}, @client_state: #{@client_state.inspect}" } end + # @private def client_state @mutex.synchronize { @client_state } end @@ -549,11 +611,28 @@ end def unlocked_connect(opts={}) return if @cnx timeout = opts.fetch(:timeout, @connection_timeout) + + # this is a little bit of a lie, but is the legitimate state we're in when we first + # create the connection. + @last_cnx_state = Zookeeper::ZOO_CONNECTING_STATE + @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block) + spawn_reconnect_thread + + # this is a bit of a hack, because we need to wait until the event thread + # delivers the connected event, which we used to be able to rely on just the + # connection doing. since we don't want to call the @cnx.state method to check + # (rather use the cached @last_cnx_state), we wait for consistency's sake + # + # NOTE: this may cause issues later if we move to using non-reentrant locks + # TODO: this may wind up causing the whole process to take longer + # than `timeout` to complete, we should probably be using a difference + # (i.e. time-to-go) here + wait_until_connected_or_dying(timeout) end - end - end -end + end # Threaded + end # Client +end # ZK