lib/zk/client/threaded.rb in zk-1.5.0 vs lib/zk/client/threaded.rb in zk-1.5.1
- old
+ new
@@ -41,10 +41,19 @@
include Conveniences
include Logging
DEFAULT_THREADPOOL_SIZE = 1
+ # @private
+ module Constants
+ CLI_RUNNING = :running
+ CLI_PAUSED = :paused
+ CLI_CLOSE_REQ = :close_requested
+ CLI_CLOSED = :closed
+ end
+ include Constants
+
# Construct a new threaded client.
#
# Pay close attention to the `:threaded` option, and have a look at the
# [EventDeliveryModel](https://github.com/slyphon/zk/wiki/EventDeliveryModel)
# page in the wiki for a discussion of the relative advantages and
@@ -82,10 +91,23 @@
# connection. You *almost definately* want to leave this at the default.
# The only reason not to is if you already have a handler registered
# that does something application specific, and you want to avoid a
# conflict.
#
+ # @option opts [Fixnum] :retry_duration (nil) for how long (in seconds)
+ # should we wait to re-attempt a synchronous operation after we receive a
+ # ZK::Exceptions::Retryable error. This exception (or really, group of
+ # exceptions) is raised when there has been an unintentional network
+ # connection or session loss, so retrying an operation in this situation
+ # is like saying "If we are disconnected, How long should we wait for the
+ # connection to become available before attempthing this operation?"
+ #
+ # The default `nil` means automatic retry is not attempted.
+ #
+ # This is a global option, and will be used for all operations on this
+ # connection, however it can be overridden for any individual operation.
+ #
# @option opts [:single,:per_callback] :thread (:single) choose your event
# delivery model:
#
# * `:single`: There is one thread, and only one callback is called at
# a time. This is the default mode (for now), and will provide the most
@@ -138,36 +160,41 @@
@event_handler = EventHandler.new(self, opts)
@reconnect = opts.fetch(:reconnect, true)
@mutex = Monitor.new
- @cond = @mutex.new_cond
+ @cond = @mutex.new_cond
- @cli_state = :running # this is to distinguish between *our* state and the underlying connection state
+ @cli_state = CLI_RUNNING # this is to distinguish between *our* state and the underlying connection state
+ # this is the last status update we've received from the underlying connection
+ @last_cnx_state = Zookeeper::ZOO_CLOSED_STATE
+
+ @retry_duration = opts.fetch(:retry_duration, nil).to_i
+
@fork_subs = [
ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)),
ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)),
ForkHook.after_fork_in_child(method(:reopen)),
]
+ ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs))
+
yield self if block_given?
- @mutex.synchronize do
- connect if opts.fetch(:connect, true)
- end
+ connect if opts.fetch(:connect, true)
end
+ def self.finalizer(hooks)
+ proc { hooks.each(&:unregister) }
+ end
+
# @option opts [Fixnum] :timeout how long we will wait for the connection
# to be established. If timeout is nil, we will wait forever: *use
# carefully*.
def connect(opts={})
- @mutex.synchronize do
- return if @cnx
- timeout = opts.fetch(:timeout, @connection_timeout)
- @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block)
- end
+ @mutex.synchronize { unlocked_connect(opts) }
end
# (see Base#reopen)
def reopen(timeout=nil)
# If we've forked, then we can call all sorts of normally dangerous
@@ -176,13 +203,14 @@
# ok, just to sanity check here
raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION)
logger.debug { "reopening everything, fork detected!" }
- @mutex = Monitor.new
- @pid = Process.pid
- @cli_state = :running # reset state to running if we were paused
+ @mutex = Mutex.new
+ @cond = ConditionVariable.new
+ @pid = Process.pid
+ @cli_state = CLI_RUNNING # reset state to running if we were paused
old_cnx, @cnx = @cnx, nil
old_cnx.close! if old_cnx # && !old_cnx.closed?
@mutex.synchronize do
@@ -193,15 +221,15 @@
# extra paranoid
@event_handler.reopen_after_fork!
@threadpool.reopen_after_fork! # prune dead threadpool threads after a fork()
- connect
+ unlocked_connect
end
else
@mutex.synchronize do
- if @cli_state == :paused
+ if @cli_state == CLI_PAUSED
# XXX: what to do in this case? does it matter?
end
logger.debug { "reopening, no fork detected" }
@cnx.reopen(timeout) # ok, we werent' forked, so just reopen
@@ -218,27 +246,36 @@
# if a ping is not received within 20 seconds, the session is considered dead
# and must be re-established so be sure to call {#resume_after_fork_in_parent}
# before that deadline, or you will have to re-establish your session.
#
# @raise [InvalidStateError] when called and not in running? state
+ # @private
def pause_before_fork_in_parent
@mutex.synchronize do
- raise InvalidStateError, "client must be running? when you call #{__method__}" unless running?
- @cli_state = :paused
+ raise InvalidStateError, "client must be running? when you call #{__method__}" unless (@cli_state == CLI_RUNNING)
+ @cli_state = CLI_PAUSED
+
+ logger.debug { "#{self.class}##{__method__}" }
+
+ @cond.broadcast
end
- logger.debug { "#{self.class}##{__method__}" }
+
[@event_handler, @threadpool, @cnx].each(&:pause_before_fork_in_parent)
end
+ # @private
def resume_after_fork_in_parent
@mutex.synchronize do
- raise InvalidStateError, "client must be paused? when you call #{__method__}" unless paused?
- @cli_state = :running
- end
+ raise InvalidStateError, "client must be paused? when you call #{__method__}" unless (@cli_state == CLI_PAUSED)
+ @cli_state = CLI_RUNNING
- logger.debug { "#{self.class}##{__method__}" }
- [@cnx, @event_handler, @threadpool].each(&:resume_after_fork_in_parent)
+ logger.debug { "#{self.class}##{__method__}" }
+
+ [@cnx, @event_handler, @threadpool].each(&:resume_after_fork_in_parent)
+
+ @cond.broadcast
+ end
end
# (see Base#close!)
#
# @note We will make our best effort to do the right thing if you call
@@ -248,12 +285,13 @@
# threadpool.
#
def close!
@mutex.synchronize do
return if [:closed, :close_requested].include?(@cli_state)
-# logger.debug { "moving to :close_requested state" }
- @cli_state = :close_requested
+ logger.debug { "moving to :close_requested state" }
+ @cli_state = CLI_CLOSE_REQ
+ @cond.broadcast
end
on_tpool = on_threadpool?
# Ok, so the threadpool will wait up to N seconds while joining each thread.
@@ -267,12 +305,13 @@
shutdown_thread = Thread.new do
@threadpool.shutdown(10)
super
@mutex.synchronize do
-# logger.debug { "moving to :closed state" }
- @cli_state = :closed
+ logger.debug { "moving to :closed state" }
+ @cli_state = CLI_CLOSED
+ @cond.broadcast
end
end
on_tpool ? shutdown_thread : shutdown_thread.join
end
@@ -296,64 +335,101 @@
end
# @private
def raw_event_handler(event)
return unless event.session_event?
+
+ @mutex.synchronize do
+ @last_cnx_state = event.state
- if event.client_invalid?
- return unless @reconnect
+ if event.client_invalid? and @reconnect and not dead_or_dying?
+ logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" }
- @mutex.synchronize do
- unless dead_or_dying? # a legitimate shutdown case
-
- logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" }
-
- # reopen(0) means that we don't want to wait for the connection
- # to reach the connected state before returning as we're on the
- # event thread.
- reopen(0)
- end
+ # reopen(0) means that we don't want to wait for the connection
+ # to reach the connected state before returning as we're on the
+ # event thread.
+ reopen(0)
end
+
+ @cond.broadcast # wake anyone waiting for a connection state update
end
rescue Exception => e
logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" }
end
def closed?
- return true if @mutex.synchronize { @cli_state == :closed }
+ return true if @mutex.synchronize { @cli_state == CLI_CLOSED }
super
end
# are we in running (not-paused) state?
def running?
- @mutex.synchronize { @cli_state == :running }
+ @mutex.synchronize { @cli_state == CLI_RUNNING }
end
# are we in paused state?
def paused?
- @mutex.synchronize { @cli_state == :paused }
+ @mutex.synchronize { @cli_state == CLI_PAUSED }
end
# has shutdown time arrived?
def close_requested?
- @mutex.synchronize { @cli_state == :close_requested }
+ @mutex.synchronize { @cli_state == CLI_CLOSE_REQ }
end
protected
# in the threaded version of the client, synchronize access around cnx
# so that callers don't wind up with a nil object when we're in the middle
# of reopening it
def cnx
@mutex.synchronize { @cnx }
end
- # @private
+ def call_and_check_rc(meth, opts)
+ if retry_duration = (opts.delete(:retry_duration) || @retry_duration)
+ begin
+ super(meth, opts)
+ rescue Exceptions::Retryable => e
+ time_to_stop = Time.now + retry_duration
+
+ wait_until_connected_or_dying(retry_duration)
+
+ if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@cli_state != CLI_RUNNING)
+ raise e
+ else
+ retry
+ end
+ end
+ else
+ super
+ end
+ end
+
+ def wait_until_connected_or_dying(timeout)
+ time_to_stop = Time.now + timeout
+
+ @mutex.synchronize do
+ while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@cli_state == CLI_RUNNING)
+ @cond.wait(timeout)
+ end
+
+ logger.debug { "@last_cnx_state: #{@last_cnx_state}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @cli_state: #{@cli_state.inspect}" }
+ end
+ end
+
def create_connection(*args)
::Zookeeper.new(*args)
end
def dead_or_dying?
- (@cli_state == :close_requested) || (@cli_state == :closed)
+ (@cli_state == CLI_CLOSE_REQ) || (@cli_state == CLI_CLOSED)
+ end
+
+ private
+ def unlocked_connect(opts={})
+ return if @cnx
+ timeout = opts.fetch(:timeout, @connection_timeout)
+ @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block)
end
end
end
end