lib/zk/client/threaded.rb in zk-1.6.1 vs lib/zk/client/threaded.rb in zk-1.6.2
- old
+ new
@@ -198,11 +198,11 @@
end
# @option opts [Fixnum] :timeout how long we will wait for the connection
# to be established. If timeout is nil, we will wait forever: *use
# carefully*.
- def connect(opts={})
+ def connect(opts={})
@mutex.synchronize { unlocked_connect(opts) }
end
# (see Base#reopen)
def reopen(timeout=nil)
@@ -214,20 +214,18 @@
logger.debug { "reopening everything, fork detected!" }
setup_locks
- @pid = Process.pid
+ @pid = Process.pid
@client_state = RUNNING # reset state to running if we were paused
old_cnx, @cnx = @cnx, nil
old_cnx.close! if old_cnx # && !old_cnx.closed?
join_and_clear_reconnect_thread
- @last_cnx_state = nil
-
@mutex.synchronize do
# it's important that we're holding the lock, as access to 'cnx' is
# synchronized, and we want to avoid a race where event handlers
# might see a nil connection. I've seen this exception occur *once*
# so it's pretty rare (it was on 1.8.7 too), but just to be double
@@ -243,12 +241,18 @@
if @client_state == PAUSED
# XXX: what to do in this case? does it matter?
end
logger.debug { "reopening, no fork detected" }
- @last_cnx_state = nil
+ @last_cnx_state = Zookeeper::ZOO_CONNECTING_STATE
@cnx.reopen(timeout) # ok, we werent' forked, so just reopen
+
+ # this is a bit of a hack, because we need to wait until the event thread
+ # delivers the connected event, which we used to be able to rely on just the
+ # connection doing. since we don't want to call the @cnx.state method to check
+ # (rather use the cached @last_cnx_state), we wait for consistency's sake
+ wait_until_connected_or_dying(timeout)
end
end
state
end
@@ -345,10 +349,41 @@
end
on_tpool ? shutdown_thread : shutdown_thread.join(30)
end
+ # this overrides the implementation in StateMixin
+ def connected?
+ @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE }
+ end
+
+ def associating?
+ @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_ASSOCIATING_STATE }
+ end
+
+ def connecting?
+ @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTING_STATE }
+ end
+
+ def expired_session?
+ @mutex.synchronize do
+ return false unless @cnx and running?
+
+ if defined?(::JRUBY_VERSION)
+ !@cnx.state.alive?
+ else
+ @last_cnx_state == Zookeeper::ZOO_EXPIRED_SESSION_STATE
+ end
+ end
+ end
+
+ def state
+ @mutex.synchronize do
+ STATE_SYM_MAP.fetch(@last_cnx_state) { |k| raise IndexError, "unrecognized state: #{k.inspect}" }
+ end
+ end
+
# {see ZK::Client::Base#close}
def close
super
subs, @fork_subs = @fork_subs, []
subs.each(&:unsubscribe)
@@ -405,24 +440,51 @@
logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" }
end
# @private
def wait_until_connected_or_dying(timeout)
- time_to_stop = Time.now + timeout
+ time_to_stop = timeout ? Time.now + timeout : nil
@mutex.synchronize do
while true
- now = Time.now
- break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (now > time_to_stop) || (@client_state != RUNNING)
- deadline = time_to_stop.to_f - now.to_f
- @cond.wait(deadline)
+ if timeout
+ now = Time.now
+ break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (now > time_to_stop) || (@client_state != RUNNING)
+ deadline = time_to_stop.to_f - now.to_f
+ @cond.wait(deadline)
+ else
+ break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (@client_state != RUNNING)
+ @cond.wait
+ end
end
+ end
- logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" }
+ logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{timeout ? (Time.now.to_f < time_to_stop.to_f) : 'true'}, @client_state: #{@client_state.inspect}" }
+ end
+
+ # @private
+ def wait_until_closed(timeout=nil)
+ time_to_stop = timeout ? Time.now + timeout : nil
+
+ @mutex.synchronize do
+ while true
+ if timeout
+ now = Time.now
+ break if (now > time_to_stop) || (@client_state == CLOSED)
+ deadline = time_to_stop.to_f - now.to_f
+ @cond.wait(deadline)
+ else
+ break if @client_state == CLOSED
+ @cond.wait
+ end
+ end
end
+
+ logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{timeout ? (Time.now.to_f < time_to_stop.to_f) : 'true'}, @client_state: #{@client_state.inspect}" }
end
+
# @private
def client_state
@mutex.synchronize { @client_state }
end
@@ -549,11 +611,28 @@
end
def unlocked_connect(opts={})
return if @cnx
timeout = opts.fetch(:timeout, @connection_timeout)
+
+ # this is a little bit of a lie, but is the legitimate state we're in when we first
+ # create the connection.
+ @last_cnx_state = Zookeeper::ZOO_CONNECTING_STATE
+
@cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block)
+
spawn_reconnect_thread
+
+ # this is a bit of a hack, because we need to wait until the event thread
+ # delivers the connected event, which we used to be able to rely on just the
+ # connection doing. since we don't want to call the @cnx.state method to check
+ # (rather use the cached @last_cnx_state), we wait for consistency's sake
+ #
+ # NOTE: this may cause issues later if we move to using non-reentrant locks
+ # TODO: this may wind up causing the whole process to take longer
+ # than `timeout` to complete, we should probably be using a difference
+ # (i.e. time-to-go) here
+ wait_until_connected_or_dying(timeout)
end
- end
- end
-end
+ end # Threaded
+ end # Client
+end # ZK