lib/zk/client/threaded.rb in zk-1.4.2 vs lib/zk/client/threaded.rb in zk-1.5.0

- old
+ new

@@ -138,13 +138,20 @@ @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) @mutex = Monitor.new + @cond = @mutex.new_cond - @close_requested = false + @cli_state = :running # this is to distinguish between *our* state and the underlying connection state + @fork_subs = [ + ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)), + ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)), + ForkHook.after_fork_in_child(method(:reopen)), + ] + yield self if block_given? @mutex.synchronize do connect if opts.fetch(:connect, true) end @@ -167,41 +174,86 @@ # stuff because we're the only thread. if forked? # ok, just to sanity check here raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION) - logger.debug { "#{self.class}##{__method__} reopening everything, fork detected!" } + logger.debug { "reopening everything, fork detected!" } @mutex = Monitor.new - @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork() - @event_handler.reopen_after_fork! @pid = Process.pid + @cli_state = :running # reset state to running if we were paused old_cnx, @cnx = @cnx, nil old_cnx.close! if old_cnx # && !old_cnx.closed? + + @mutex.synchronize do + # it's important that we're holding the lock, as access to 'cnx' is + # synchronized, and we want to avoid a race where event handlers + # might see a nil connection. I've seen this exception occur *once* + # so it's pretty rare (it was on 1.8.7 too), but just to be double + # extra paranoid + + @event_handler.reopen_after_fork! + @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork() + + connect + end else - logger.debug { "#{self.class}##{__method__} not reopening, no fork detected" } - @cnx.reopen(timeout) + @mutex.synchronize do + if @cli_state == :paused + # XXX: what to do in this case? does it matter? + end + + logger.debug { "reopening, no fork detected" } + @cnx.reopen(timeout) # ok, we werent' forked, so just reopen + end end - @mutex.synchronize { @close_requested = false } - connect state end + # Before forking, call this method to peform a "stop the world" operation on all + # objects associated with this connection. This means that this client will spin down + # and join all threads (so make sure none of your callbacks will block forever), + # and will tke no action to keep the session alive. With the default settings, + # if a ping is not received within 20 seconds, the session is considered dead + # and must be re-established so be sure to call {#resume_after_fork_in_parent} + # before that deadline, or you will have to re-establish your session. + # + # @raise [InvalidStateError] when called and not in running? state + def pause_before_fork_in_parent + @mutex.synchronize do + raise InvalidStateError, "client must be running? when you call #{__method__}" unless running? + @cli_state = :paused + end + logger.debug { "#{self.class}##{__method__}" } + [@event_handler, @threadpool, @cnx].each(&:pause_before_fork_in_parent) + end + + def resume_after_fork_in_parent + @mutex.synchronize do + raise InvalidStateError, "client must be paused? when you call #{__method__}" unless paused? + @cli_state = :running + end + + logger.debug { "#{self.class}##{__method__}" } + [@cnx, @event_handler, @threadpool].each(&:resume_after_fork_in_parent) + end + # (see Base#close!) # # @note We will make our best effort to do the right thing if you call # this method while in the threadpool. It is _a much better idea_ to # call us from the main thread, or _at least_ a thread we're not going # to be trying to shut down as part of closing the connection and # threadpool. # def close! @mutex.synchronize do - return if @close_requested - @close_requested = true + return if [:closed, :close_requested].include?(@cli_state) +# logger.debug { "moving to :close_requested state" } + @cli_state = :close_requested end on_tpool = on_threadpool? # Ok, so the threadpool will wait up to N seconds while joining each thread. @@ -211,22 +263,28 @@ # # if the user *doesn't* hate us, then we just join the shutdown_thread immediately # and wait for it to exit # shutdown_thread = Thread.new do - @threadpool.shutdown(2) + @threadpool.shutdown(10) super + + @mutex.synchronize do +# logger.debug { "moving to :closed state" } + @cli_state = :closed + end end - shutdown_thread.join unless on_tpool - - nil + on_tpool ? shutdown_thread : shutdown_thread.join end # {see Base#close} def close super + subs, @fork_subs = @fork_subs, [] + subs.each(&:unsubscribe) + nil end # (see Threadpool#on_threadpool?) def on_threadpool? @threadpool and @threadpool.on_threadpool? @@ -243,27 +301,59 @@ if event.client_invalid? return unless @reconnect @mutex.synchronize do - unless @close_requested # a legitimate shutdown case + unless dead_or_dying? # a legitimate shutdown case logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" } # reopen(0) means that we don't want to wait for the connection - # to reach the connected state before returning + # to reach the connected state before returning as we're on the + # event thread. reopen(0) end end end rescue Exception => e logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" } end + def closed? + return true if @mutex.synchronize { @cli_state == :closed } + super + end + + # are we in running (not-paused) state? + def running? + @mutex.synchronize { @cli_state == :running } + end + + # are we in paused state? + def paused? + @mutex.synchronize { @cli_state == :paused } + end + + # has shutdown time arrived? + def close_requested? + @mutex.synchronize { @cli_state == :close_requested } + end + protected + # in the threaded version of the client, synchronize access around cnx + # so that callers don't wind up with a nil object when we're in the middle + # of reopening it + def cnx + @mutex.synchronize { @cnx } + end + # @private def create_connection(*args) ::Zookeeper.new(*args) end - end + + def dead_or_dying? + (@cli_state == :close_requested) || (@cli_state == :closed) + end + end end end