lib/zk/client/threaded.rb in zk-1.5.2 vs lib/zk/client/threaded.rb in zk-1.5.3

- old
+ new

@@ -43,14 +43,14 @@ DEFAULT_THREADPOOL_SIZE = 1 # @private module Constants - CLI_RUNNING = :running - CLI_PAUSED = :paused - CLI_CLOSE_REQ = :close_requested - CLI_CLOSED = :closed + RUNNING = :running + PAUSED = :paused + CLOSE_REQ = :close_requested + CLOSED = :closed end include Constants # Construct a new threaded client. # @@ -159,17 +159,16 @@ @connection_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass? @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) - @mutex = Monitor.new - @cond = @mutex.new_cond + setup_locks - @cli_state = CLI_RUNNING # this is to distinguish between *our* state and the underlying connection state + @client_state = RUNNING # this is to distinguish between *our* state and the underlying connection state # this is the last status update we've received from the underlying connection - @last_cnx_state = Zookeeper::ZOO_CLOSED_STATE + @last_cnx_state = nil @retry_duration = opts.fetch(:retry_duration, nil).to_i yield self if block_given? @@ -181,10 +180,19 @@ ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs)) connect if opts.fetch(:connect, true) end + + # ensure that the initializer and the reopen code set up the mutexes + # the same way (i.e. use a Monitor or a Mutex, no, really, I screwed + # this up once) + def setup_locks + @mutex = Monitor.new + @cond = @mutex.new_cond + end + private :setup_locks # @private def self.finalizer(hooks) proc { hooks.each(&:unregister) } end @@ -204,18 +212,22 @@ # ok, just to sanity check here raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION) logger.debug { "reopening everything, fork detected!" } - @mutex = Mutex.new - @cond = ConditionVariable.new + setup_locks + @pid = Process.pid - @cli_state = CLI_RUNNING # reset state to running if we were paused + @client_state = RUNNING # reset state to running if we were paused old_cnx, @cnx = @cnx, nil old_cnx.close! if old_cnx # && !old_cnx.closed? + join_and_clear_reconnect_thread + + @last_cnx_state = nil + @mutex.synchronize do # it's important that we're holding the lock, as access to 'cnx' is # synchronized, and we want to avoid a race where event handlers # might see a nil connection. I've seen this exception occur *once* # so it's pretty rare (it was on 1.8.7 too), but just to be double @@ -226,15 +238,16 @@ unlocked_connect end else @mutex.synchronize do - if @cli_state == CLI_PAUSED + if @client_state == PAUSED # XXX: what to do in this case? does it matter? end logger.debug { "reopening, no fork detected" } + @last_cnx_state = nil @cnx.reopen(timeout) # ok, we werent' forked, so just reopen end end state @@ -250,35 +263,42 @@ # # @raise [InvalidStateError] when called and not in running? state # @private def pause_before_fork_in_parent @mutex.synchronize do - raise InvalidStateError, "client must be running? when you call #{__method__}" unless (@cli_state == CLI_RUNNING) - @cli_state = CLI_PAUSED + raise InvalidStateError, "client must be running? when you call #{__method__}" unless (@client_state == RUNNING) + @client_state = PAUSED logger.debug { "#{self.class}##{__method__}" } @cond.broadcast end + join_and_clear_reconnect_thread + # the compact is here because the @cnx *may* be nil when this callback is fired by the # ForkHook (in the case of ZK.open). The race is between the GC calling the finalizer [@event_handler, @threadpool, @cnx].compact.each(&:pause_before_fork_in_parent) ensure - logger.debug { "#{self.class}##{__method__} returning" } + logger.debug { "##{__method__} returning" } end # @private def resume_after_fork_in_parent @mutex.synchronize do - raise InvalidStateError, "client must be paused? when you call #{__method__}" unless (@cli_state == CLI_PAUSED) - @cli_state = CLI_RUNNING + raise InvalidStateError, "client must be paused? when you call #{__method__}" unless (@client_state == PAUSED) + @client_state = RUNNING - logger.debug { "#{self.class}##{__method__}" } + logger.debug { "##{__method__}" } - [@cnx, @event_handler, @threadpool].compact.each(&:resume_after_fork_in_parent) + if @cnx + @cnx.resume_after_fork_in_parent + spawn_reconnect_thread + end + [@event_handler, @threadpool].compact.each(&:resume_after_fork_in_parent) + @cond.broadcast end end # (see Base#close!) @@ -289,16 +309,18 @@ # to be trying to shut down as part of closing the connection and # threadpool. # def close! @mutex.synchronize do - return if [:closed, :close_requested].include?(@cli_state) + return if [:closed, :close_requested].include?(@client_state) logger.debug { "moving to :close_requested state" } - @cli_state = CLI_CLOSE_REQ + @client_state = CLOSE_REQ @cond.broadcast end + join_and_clear_reconnect_thread + on_tpool = on_threadpool? # Ok, so the threadpool will wait up to N seconds while joining each thread. # If _we're on a threadpool thread_, have it wait until we're ready to jump # out of this method, and tell it to wait up to 5 seconds to let us get @@ -306,18 +328,20 @@ # # if the user *doesn't* hate us, then we just join the shutdown_thread immediately # and wait for it to exit # shutdown_thread = Thread.new do + Thread.current[:name] = 'shutdown' @threadpool.shutdown(10) # this will call #close super @mutex.synchronize do logger.debug { "moving to :closed state" } - @cli_state = CLI_CLOSED + @client_state = CLOSED + @last_cnx_state = nil @cond.broadcast end end on_tpool ? shutdown_thread : shutdown_thread.join(30) @@ -346,97 +370,165 @@ return unless event.session_event? @mutex.synchronize do @last_cnx_state = event.state - if event.client_invalid? and @reconnect and not dead_or_dying? - logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" } - - # reopen(0) means that we don't want to wait for the connection - # to reach the connected state before returning as we're on the - # event thread. - reopen(0) - end - @cond.broadcast # wake anyone waiting for a connection state update end rescue Exception => e logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" } end def closed? - return true if @mutex.synchronize { @cli_state == CLI_CLOSED } + return true if @mutex.synchronize { @client_state == CLOSED } super end # are we in running (not-paused) state? + # @private def running? - @mutex.synchronize { @cli_state == CLI_RUNNING } + @mutex.synchronize { @client_state == RUNNING } end # are we in paused state? + # @private def paused? - @mutex.synchronize { @cli_state == CLI_PAUSED } + @mutex.synchronize { @client_state == PAUSED } end # has shutdown time arrived? + # @private def close_requested? - @mutex.synchronize { @cli_state == CLI_CLOSE_REQ } + @mutex.synchronize { @client_state == CLOSE_REQ } end - protected + # @private + def wait_until_connected_or_dying(timeout) + time_to_stop = Time.now + timeout + + @mutex.synchronize do + while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@client_state == RUNNING) + @cond.wait(timeout) + end + + logger.debug { "@last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" } + end + end + + private + # this is just here so we can see it in stack traces + def reopen_after_session_expired + reopen + end + # in the threaded version of the client, synchronize access around cnx # so that callers don't wind up with a nil object when we're in the middle # of reopening it def cnx @mutex.synchronize { @cnx } end + def reconnect_thread_body + Thread.current[:name] = 'reconnect' + while @reconnect # too clever? + @mutex.synchronize do + # either we havne't seen a valid session update from this + # connection yet, or we're doing fine, so just wait + @cond.wait_while { !seen_session_state_event? or (valid_session_state? and (@client_state == RUNNING)) } + + # we've entered into a non-running state, so we exit + # note: need to restart this thread after a fork in parent + if @client_state != RUNNING + logger.debug { "session failure watcher thread exiting, @client_state: #{@client_state}" } + return + end + + # if we know that this session was valid once and it has now + # become invalid we call reopen + # + if seen_session_state_event? and not valid_session_state? + logger.debug { "session state was invalid, calling reopen" } + + # reopen will reset @last_cnx_state so that + # seen_session_state_event? will return false until the first + # event has been delivered on the new connection + rv = reopen_after_session_expired + + logger.debug { "reopen returned: #{rv.inspect}" } + end + end + end + ensure + logger.debug { "reconnect thread exiting" } + end + + def join_and_clear_reconnect_thread + return unless @reconnect_thread + begin + # this should never time out but, just to make sure we don't hang forever + unless @reconnect_thread.join(30) + logger.error { "timed out waiting for reconnect thread to join! something is hosed!" } + end + rescue Exception => e + logger.error { "caught exception joining reconnect thread" } + logger.error { e.to_std_format } + end + @reconnect_thread = nil + end + + def spawn_reconnect_thread + @reconnect_thread ||= Thread.new(&method(:reconnect_thread_body)) + end + def call_and_check_rc(meth, opts) if retry_duration = (opts.delete(:retry_duration) || @retry_duration) begin super(meth, opts) rescue Exceptions::Retryable => e time_to_stop = Time.now + retry_duration wait_until_connected_or_dying(retry_duration) - if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@cli_state != CLI_RUNNING) + if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@client_state != RUNNING) raise e else retry end end else super end end - def wait_until_connected_or_dying(timeout) - time_to_stop = Time.now + timeout + # have we gotten a status event for the current connection? + # this method is not synchronized + def seen_session_state_event? + !!@last_cnx_state + end - @mutex.synchronize do - while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@cli_state == CLI_RUNNING) - @cond.wait(timeout) - end - - logger.debug { "@last_cnx_state: #{@last_cnx_state}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @cli_state: #{@cli_state.inspect}" } - end + # we've seen a session state from the cnx, and it was not "omg we're screwed" + # will return false if we havne't gotten a session event yet + # + # this method is not synchronized + def valid_session_state? + # this is kind of icky, but the SESSION_INVALID and AUTH_FAILED states + # are both negative numbers + @last_cnx_state and (@last_cnx_state >= 0) end def create_connection(*args) ::Zookeeper.new(*args) end def dead_or_dying? - (@cli_state == CLI_CLOSE_REQ) || (@cli_state == CLI_CLOSED) + (@client_state == CLOSE_REQ) || (@client_state == CLOSED) end - private def unlocked_connect(opts={}) return if @cnx timeout = opts.fetch(:timeout, @connection_timeout) @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block) + spawn_reconnect_thread end end end end