lib/zk/client/threaded.rb in zk-1.4.2 vs lib/zk/client/threaded.rb in zk-1.5.0
- old
+ new
@@ -138,13 +138,20 @@
@event_handler = EventHandler.new(self, opts)
@reconnect = opts.fetch(:reconnect, true)
@mutex = Monitor.new
+ @cond = @mutex.new_cond
- @close_requested = false
+ @cli_state = :running # this is to distinguish between *our* state and the underlying connection state
+ @fork_subs = [
+ ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)),
+ ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)),
+ ForkHook.after_fork_in_child(method(:reopen)),
+ ]
+
yield self if block_given?
@mutex.synchronize do
connect if opts.fetch(:connect, true)
end
@@ -167,41 +174,86 @@
# stuff because we're the only thread.
if forked?
# ok, just to sanity check here
raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION)
- logger.debug { "#{self.class}##{__method__} reopening everything, fork detected!" }
+ logger.debug { "reopening everything, fork detected!" }
@mutex = Monitor.new
- @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork()
- @event_handler.reopen_after_fork!
@pid = Process.pid
+ @cli_state = :running # reset state to running if we were paused
old_cnx, @cnx = @cnx, nil
old_cnx.close! if old_cnx # && !old_cnx.closed?
+
+ @mutex.synchronize do
+ # it's important that we're holding the lock, as access to 'cnx' is
+ # synchronized, and we want to avoid a race where event handlers
+ # might see a nil connection. I've seen this exception occur *once*
+ # so it's pretty rare (it was on 1.8.7 too), but just to be double
+ # extra paranoid
+
+ @event_handler.reopen_after_fork!
+ @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork()
+
+ connect
+ end
else
- logger.debug { "#{self.class}##{__method__} not reopening, no fork detected" }
- @cnx.reopen(timeout)
+ @mutex.synchronize do
+ if @cli_state == :paused
+ # XXX: what to do in this case? does it matter?
+ end
+
+ logger.debug { "reopening, no fork detected" }
+ @cnx.reopen(timeout) # ok, we werent' forked, so just reopen
+ end
end
- @mutex.synchronize { @close_requested = false }
- connect
state
end
+ # Before forking, call this method to peform a "stop the world" operation on all
+ # objects associated with this connection. This means that this client will spin down
+ # and join all threads (so make sure none of your callbacks will block forever),
+ # and will tke no action to keep the session alive. With the default settings,
+ # if a ping is not received within 20 seconds, the session is considered dead
+ # and must be re-established so be sure to call {#resume_after_fork_in_parent}
+ # before that deadline, or you will have to re-establish your session.
+ #
+ # @raise [InvalidStateError] when called and not in running? state
+ def pause_before_fork_in_parent
+ @mutex.synchronize do
+ raise InvalidStateError, "client must be running? when you call #{__method__}" unless running?
+ @cli_state = :paused
+ end
+ logger.debug { "#{self.class}##{__method__}" }
+ [@event_handler, @threadpool, @cnx].each(&:pause_before_fork_in_parent)
+ end
+
+ def resume_after_fork_in_parent
+ @mutex.synchronize do
+ raise InvalidStateError, "client must be paused? when you call #{__method__}" unless paused?
+ @cli_state = :running
+ end
+
+ logger.debug { "#{self.class}##{__method__}" }
+ [@cnx, @event_handler, @threadpool].each(&:resume_after_fork_in_parent)
+ end
+
# (see Base#close!)
#
# @note We will make our best effort to do the right thing if you call
# this method while in the threadpool. It is _a much better idea_ to
# call us from the main thread, or _at least_ a thread we're not going
# to be trying to shut down as part of closing the connection and
# threadpool.
#
def close!
@mutex.synchronize do
- return if @close_requested
- @close_requested = true
+ return if [:closed, :close_requested].include?(@cli_state)
+# logger.debug { "moving to :close_requested state" }
+ @cli_state = :close_requested
end
on_tpool = on_threadpool?
# Ok, so the threadpool will wait up to N seconds while joining each thread.
@@ -211,22 +263,28 @@
#
# if the user *doesn't* hate us, then we just join the shutdown_thread immediately
# and wait for it to exit
#
shutdown_thread = Thread.new do
- @threadpool.shutdown(2)
+ @threadpool.shutdown(10)
super
+
+ @mutex.synchronize do
+# logger.debug { "moving to :closed state" }
+ @cli_state = :closed
+ end
end
- shutdown_thread.join unless on_tpool
-
- nil
+ on_tpool ? shutdown_thread : shutdown_thread.join
end
# {see Base#close}
def close
super
+ subs, @fork_subs = @fork_subs, []
+ subs.each(&:unsubscribe)
+ nil
end
# (see Threadpool#on_threadpool?)
def on_threadpool?
@threadpool and @threadpool.on_threadpool?
@@ -243,27 +301,59 @@
if event.client_invalid?
return unless @reconnect
@mutex.synchronize do
- unless @close_requested # a legitimate shutdown case
+ unless dead_or_dying? # a legitimate shutdown case
logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" }
# reopen(0) means that we don't want to wait for the connection
- # to reach the connected state before returning
+ # to reach the connected state before returning as we're on the
+ # event thread.
reopen(0)
end
end
end
rescue Exception => e
logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" }
end
+ def closed?
+ return true if @mutex.synchronize { @cli_state == :closed }
+ super
+ end
+
+ # are we in running (not-paused) state?
+ def running?
+ @mutex.synchronize { @cli_state == :running }
+ end
+
+ # are we in paused state?
+ def paused?
+ @mutex.synchronize { @cli_state == :paused }
+ end
+
+ # has shutdown time arrived?
+ def close_requested?
+ @mutex.synchronize { @cli_state == :close_requested }
+ end
+
protected
+ # in the threaded version of the client, synchronize access around cnx
+ # so that callers don't wind up with a nil object when we're in the middle
+ # of reopening it
+ def cnx
+ @mutex.synchronize { @cnx }
+ end
+
# @private
def create_connection(*args)
::Zookeeper.new(*args)
end
- end
+
+ def dead_or_dying?
+ (@cli_state == :close_requested) || (@cli_state == :closed)
+ end
+ end
end
end