lib/zk/client/threaded.rb in zk-1.5.2 vs lib/zk/client/threaded.rb in zk-1.5.3
- old
+ new
@@ -43,14 +43,14 @@
DEFAULT_THREADPOOL_SIZE = 1
# @private
module Constants
- CLI_RUNNING = :running
- CLI_PAUSED = :paused
- CLI_CLOSE_REQ = :close_requested
- CLI_CLOSED = :closed
+ RUNNING = :running
+ PAUSED = :paused
+ CLOSE_REQ = :close_requested
+ CLOSED = :closed
end
include Constants
# Construct a new threaded client.
#
@@ -159,17 +159,16 @@
@connection_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass?
@event_handler = EventHandler.new(self, opts)
@reconnect = opts.fetch(:reconnect, true)
- @mutex = Monitor.new
- @cond = @mutex.new_cond
+ setup_locks
- @cli_state = CLI_RUNNING # this is to distinguish between *our* state and the underlying connection state
+ @client_state = RUNNING # this is to distinguish between *our* state and the underlying connection state
# this is the last status update we've received from the underlying connection
- @last_cnx_state = Zookeeper::ZOO_CLOSED_STATE
+ @last_cnx_state = nil
@retry_duration = opts.fetch(:retry_duration, nil).to_i
yield self if block_given?
@@ -181,10 +180,19 @@
ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs))
connect if opts.fetch(:connect, true)
end
+
+ # ensure that the initializer and the reopen code set up the mutexes
+ # the same way (i.e. use a Monitor or a Mutex, no, really, I screwed
+ # this up once)
+ def setup_locks
+ @mutex = Monitor.new
+ @cond = @mutex.new_cond
+ end
+ private :setup_locks
# @private
def self.finalizer(hooks)
proc { hooks.each(&:unregister) }
end
@@ -204,18 +212,22 @@
# ok, just to sanity check here
raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION)
logger.debug { "reopening everything, fork detected!" }
- @mutex = Mutex.new
- @cond = ConditionVariable.new
+ setup_locks
+
@pid = Process.pid
- @cli_state = CLI_RUNNING # reset state to running if we were paused
+ @client_state = RUNNING # reset state to running if we were paused
old_cnx, @cnx = @cnx, nil
old_cnx.close! if old_cnx # && !old_cnx.closed?
+ join_and_clear_reconnect_thread
+
+ @last_cnx_state = nil
+
@mutex.synchronize do
# it's important that we're holding the lock, as access to 'cnx' is
# synchronized, and we want to avoid a race where event handlers
# might see a nil connection. I've seen this exception occur *once*
# so it's pretty rare (it was on 1.8.7 too), but just to be double
@@ -226,15 +238,16 @@
unlocked_connect
end
else
@mutex.synchronize do
- if @cli_state == CLI_PAUSED
+ if @client_state == PAUSED
# XXX: what to do in this case? does it matter?
end
logger.debug { "reopening, no fork detected" }
+ @last_cnx_state = nil
@cnx.reopen(timeout) # ok, we werent' forked, so just reopen
end
end
state
@@ -250,35 +263,42 @@
#
# @raise [InvalidStateError] when called and not in running? state
# @private
def pause_before_fork_in_parent
@mutex.synchronize do
- raise InvalidStateError, "client must be running? when you call #{__method__}" unless (@cli_state == CLI_RUNNING)
- @cli_state = CLI_PAUSED
+ raise InvalidStateError, "client must be running? when you call #{__method__}" unless (@client_state == RUNNING)
+ @client_state = PAUSED
logger.debug { "#{self.class}##{__method__}" }
@cond.broadcast
end
+ join_and_clear_reconnect_thread
+
# the compact is here because the @cnx *may* be nil when this callback is fired by the
# ForkHook (in the case of ZK.open). The race is between the GC calling the finalizer
[@event_handler, @threadpool, @cnx].compact.each(&:pause_before_fork_in_parent)
ensure
- logger.debug { "#{self.class}##{__method__} returning" }
+ logger.debug { "##{__method__} returning" }
end
# @private
def resume_after_fork_in_parent
@mutex.synchronize do
- raise InvalidStateError, "client must be paused? when you call #{__method__}" unless (@cli_state == CLI_PAUSED)
- @cli_state = CLI_RUNNING
+ raise InvalidStateError, "client must be paused? when you call #{__method__}" unless (@client_state == PAUSED)
+ @client_state = RUNNING
- logger.debug { "#{self.class}##{__method__}" }
+ logger.debug { "##{__method__}" }
- [@cnx, @event_handler, @threadpool].compact.each(&:resume_after_fork_in_parent)
+ if @cnx
+ @cnx.resume_after_fork_in_parent
+ spawn_reconnect_thread
+ end
+ [@event_handler, @threadpool].compact.each(&:resume_after_fork_in_parent)
+
@cond.broadcast
end
end
# (see Base#close!)
@@ -289,16 +309,18 @@
# to be trying to shut down as part of closing the connection and
# threadpool.
#
def close!
@mutex.synchronize do
- return if [:closed, :close_requested].include?(@cli_state)
+ return if [:closed, :close_requested].include?(@client_state)
logger.debug { "moving to :close_requested state" }
- @cli_state = CLI_CLOSE_REQ
+ @client_state = CLOSE_REQ
@cond.broadcast
end
+ join_and_clear_reconnect_thread
+
on_tpool = on_threadpool?
# Ok, so the threadpool will wait up to N seconds while joining each thread.
# If _we're on a threadpool thread_, have it wait until we're ready to jump
# out of this method, and tell it to wait up to 5 seconds to let us get
@@ -306,18 +328,20 @@
#
# if the user *doesn't* hate us, then we just join the shutdown_thread immediately
# and wait for it to exit
#
shutdown_thread = Thread.new do
+ Thread.current[:name] = 'shutdown'
@threadpool.shutdown(10)
# this will call #close
super
@mutex.synchronize do
logger.debug { "moving to :closed state" }
- @cli_state = CLI_CLOSED
+ @client_state = CLOSED
+ @last_cnx_state = nil
@cond.broadcast
end
end
on_tpool ? shutdown_thread : shutdown_thread.join(30)
@@ -346,97 +370,165 @@
return unless event.session_event?
@mutex.synchronize do
@last_cnx_state = event.state
- if event.client_invalid? and @reconnect and not dead_or_dying?
- logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" }
-
- # reopen(0) means that we don't want to wait for the connection
- # to reach the connected state before returning as we're on the
- # event thread.
- reopen(0)
- end
-
@cond.broadcast # wake anyone waiting for a connection state update
end
rescue Exception => e
logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" }
end
def closed?
- return true if @mutex.synchronize { @cli_state == CLI_CLOSED }
+ return true if @mutex.synchronize { @client_state == CLOSED }
super
end
# are we in running (not-paused) state?
+ # @private
def running?
- @mutex.synchronize { @cli_state == CLI_RUNNING }
+ @mutex.synchronize { @client_state == RUNNING }
end
# are we in paused state?
+ # @private
def paused?
- @mutex.synchronize { @cli_state == CLI_PAUSED }
+ @mutex.synchronize { @client_state == PAUSED }
end
# has shutdown time arrived?
+ # @private
def close_requested?
- @mutex.synchronize { @cli_state == CLI_CLOSE_REQ }
+ @mutex.synchronize { @client_state == CLOSE_REQ }
end
- protected
+ # @private
+ def wait_until_connected_or_dying(timeout)
+ time_to_stop = Time.now + timeout
+
+ @mutex.synchronize do
+ while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@client_state == RUNNING)
+ @cond.wait(timeout)
+ end
+
+ logger.debug { "@last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" }
+ end
+ end
+
+ private
+ # this is just here so we can see it in stack traces
+ def reopen_after_session_expired
+ reopen
+ end
+
# in the threaded version of the client, synchronize access around cnx
# so that callers don't wind up with a nil object when we're in the middle
# of reopening it
def cnx
@mutex.synchronize { @cnx }
end
+ def reconnect_thread_body
+ Thread.current[:name] = 'reconnect'
+ while @reconnect # too clever?
+ @mutex.synchronize do
+ # either we havne't seen a valid session update from this
+ # connection yet, or we're doing fine, so just wait
+ @cond.wait_while { !seen_session_state_event? or (valid_session_state? and (@client_state == RUNNING)) }
+
+ # we've entered into a non-running state, so we exit
+ # note: need to restart this thread after a fork in parent
+ if @client_state != RUNNING
+ logger.debug { "session failure watcher thread exiting, @client_state: #{@client_state}" }
+ return
+ end
+
+ # if we know that this session was valid once and it has now
+ # become invalid we call reopen
+ #
+ if seen_session_state_event? and not valid_session_state?
+ logger.debug { "session state was invalid, calling reopen" }
+
+ # reopen will reset @last_cnx_state so that
+ # seen_session_state_event? will return false until the first
+ # event has been delivered on the new connection
+ rv = reopen_after_session_expired
+
+ logger.debug { "reopen returned: #{rv.inspect}" }
+ end
+ end
+ end
+ ensure
+ logger.debug { "reconnect thread exiting" }
+ end
+
+ def join_and_clear_reconnect_thread
+ return unless @reconnect_thread
+ begin
+ # this should never time out but, just to make sure we don't hang forever
+ unless @reconnect_thread.join(30)
+ logger.error { "timed out waiting for reconnect thread to join! something is hosed!" }
+ end
+ rescue Exception => e
+ logger.error { "caught exception joining reconnect thread" }
+ logger.error { e.to_std_format }
+ end
+ @reconnect_thread = nil
+ end
+
+ def spawn_reconnect_thread
+ @reconnect_thread ||= Thread.new(&method(:reconnect_thread_body))
+ end
+
def call_and_check_rc(meth, opts)
if retry_duration = (opts.delete(:retry_duration) || @retry_duration)
begin
super(meth, opts)
rescue Exceptions::Retryable => e
time_to_stop = Time.now + retry_duration
wait_until_connected_or_dying(retry_duration)
- if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@cli_state != CLI_RUNNING)
+ if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@client_state != RUNNING)
raise e
else
retry
end
end
else
super
end
end
- def wait_until_connected_or_dying(timeout)
- time_to_stop = Time.now + timeout
+ # have we gotten a status event for the current connection?
+ # this method is not synchronized
+ def seen_session_state_event?
+ !!@last_cnx_state
+ end
- @mutex.synchronize do
- while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@cli_state == CLI_RUNNING)
- @cond.wait(timeout)
- end
-
- logger.debug { "@last_cnx_state: #{@last_cnx_state}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @cli_state: #{@cli_state.inspect}" }
- end
+ # we've seen a session state from the cnx, and it was not "omg we're screwed"
+ # will return false if we havne't gotten a session event yet
+ #
+ # this method is not synchronized
+ def valid_session_state?
+ # this is kind of icky, but the SESSION_INVALID and AUTH_FAILED states
+ # are both negative numbers
+ @last_cnx_state and (@last_cnx_state >= 0)
end
def create_connection(*args)
::Zookeeper.new(*args)
end
def dead_or_dying?
- (@cli_state == CLI_CLOSE_REQ) || (@cli_state == CLI_CLOSED)
+ (@client_state == CLOSE_REQ) || (@client_state == CLOSED)
end
- private
def unlocked_connect(opts={})
return if @cnx
timeout = opts.fetch(:timeout, @connection_timeout)
@cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block)
+ spawn_reconnect_thread
end
end
end
end