lib/zk/client/threaded.rb in zk-1.5.0 vs lib/zk/client/threaded.rb in zk-1.5.1

- old
+ new

@@ -41,10 +41,19 @@ include Conveniences include Logging DEFAULT_THREADPOOL_SIZE = 1 + # @private + module Constants + CLI_RUNNING = :running + CLI_PAUSED = :paused + CLI_CLOSE_REQ = :close_requested + CLI_CLOSED = :closed + end + include Constants + # Construct a new threaded client. # # Pay close attention to the `:threaded` option, and have a look at the # [EventDeliveryModel](https://github.com/slyphon/zk/wiki/EventDeliveryModel) # page in the wiki for a discussion of the relative advantages and @@ -82,10 +91,23 @@ # connection. You *almost definately* want to leave this at the default. # The only reason not to is if you already have a handler registered # that does something application specific, and you want to avoid a # conflict. # + # @option opts [Fixnum] :retry_duration (nil) for how long (in seconds) + # should we wait to re-attempt a synchronous operation after we receive a + # ZK::Exceptions::Retryable error. This exception (or really, group of + # exceptions) is raised when there has been an unintentional network + # connection or session loss, so retrying an operation in this situation + # is like saying "If we are disconnected, How long should we wait for the + # connection to become available before attempthing this operation?" + # + # The default `nil` means automatic retry is not attempted. + # + # This is a global option, and will be used for all operations on this + # connection, however it can be overridden for any individual operation. + # # @option opts [:single,:per_callback] :thread (:single) choose your event # delivery model: # # * `:single`: There is one thread, and only one callback is called at # a time. This is the default mode (for now), and will provide the most @@ -138,36 +160,41 @@ @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) @mutex = Monitor.new - @cond = @mutex.new_cond + @cond = @mutex.new_cond - @cli_state = :running # this is to distinguish between *our* state and the underlying connection state + @cli_state = CLI_RUNNING # this is to distinguish between *our* state and the underlying connection state + # this is the last status update we've received from the underlying connection + @last_cnx_state = Zookeeper::ZOO_CLOSED_STATE + + @retry_duration = opts.fetch(:retry_duration, nil).to_i + @fork_subs = [ ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)), ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)), ForkHook.after_fork_in_child(method(:reopen)), ] + ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs)) + yield self if block_given? - @mutex.synchronize do - connect if opts.fetch(:connect, true) - end + connect if opts.fetch(:connect, true) end + def self.finalizer(hooks) + proc { hooks.each(&:unregister) } + end + # @option opts [Fixnum] :timeout how long we will wait for the connection # to be established. If timeout is nil, we will wait forever: *use # carefully*. def connect(opts={}) - @mutex.synchronize do - return if @cnx - timeout = opts.fetch(:timeout, @connection_timeout) - @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block) - end + @mutex.synchronize { unlocked_connect(opts) } end # (see Base#reopen) def reopen(timeout=nil) # If we've forked, then we can call all sorts of normally dangerous @@ -176,13 +203,14 @@ # ok, just to sanity check here raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION) logger.debug { "reopening everything, fork detected!" } - @mutex = Monitor.new - @pid = Process.pid - @cli_state = :running # reset state to running if we were paused + @mutex = Mutex.new + @cond = ConditionVariable.new + @pid = Process.pid + @cli_state = CLI_RUNNING # reset state to running if we were paused old_cnx, @cnx = @cnx, nil old_cnx.close! if old_cnx # && !old_cnx.closed? @mutex.synchronize do @@ -193,15 +221,15 @@ # extra paranoid @event_handler.reopen_after_fork! @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork() - connect + unlocked_connect end else @mutex.synchronize do - if @cli_state == :paused + if @cli_state == CLI_PAUSED # XXX: what to do in this case? does it matter? end logger.debug { "reopening, no fork detected" } @cnx.reopen(timeout) # ok, we werent' forked, so just reopen @@ -218,27 +246,36 @@ # if a ping is not received within 20 seconds, the session is considered dead # and must be re-established so be sure to call {#resume_after_fork_in_parent} # before that deadline, or you will have to re-establish your session. # # @raise [InvalidStateError] when called and not in running? state + # @private def pause_before_fork_in_parent @mutex.synchronize do - raise InvalidStateError, "client must be running? when you call #{__method__}" unless running? - @cli_state = :paused + raise InvalidStateError, "client must be running? when you call #{__method__}" unless (@cli_state == CLI_RUNNING) + @cli_state = CLI_PAUSED + + logger.debug { "#{self.class}##{__method__}" } + + @cond.broadcast end - logger.debug { "#{self.class}##{__method__}" } + [@event_handler, @threadpool, @cnx].each(&:pause_before_fork_in_parent) end + # @private def resume_after_fork_in_parent @mutex.synchronize do - raise InvalidStateError, "client must be paused? when you call #{__method__}" unless paused? - @cli_state = :running - end + raise InvalidStateError, "client must be paused? when you call #{__method__}" unless (@cli_state == CLI_PAUSED) + @cli_state = CLI_RUNNING - logger.debug { "#{self.class}##{__method__}" } - [@cnx, @event_handler, @threadpool].each(&:resume_after_fork_in_parent) + logger.debug { "#{self.class}##{__method__}" } + + [@cnx, @event_handler, @threadpool].each(&:resume_after_fork_in_parent) + + @cond.broadcast + end end # (see Base#close!) # # @note We will make our best effort to do the right thing if you call @@ -248,12 +285,13 @@ # threadpool. # def close! @mutex.synchronize do return if [:closed, :close_requested].include?(@cli_state) -# logger.debug { "moving to :close_requested state" } - @cli_state = :close_requested + logger.debug { "moving to :close_requested state" } + @cli_state = CLI_CLOSE_REQ + @cond.broadcast end on_tpool = on_threadpool? # Ok, so the threadpool will wait up to N seconds while joining each thread. @@ -267,12 +305,13 @@ shutdown_thread = Thread.new do @threadpool.shutdown(10) super @mutex.synchronize do -# logger.debug { "moving to :closed state" } - @cli_state = :closed + logger.debug { "moving to :closed state" } + @cli_state = CLI_CLOSED + @cond.broadcast end end on_tpool ? shutdown_thread : shutdown_thread.join end @@ -296,64 +335,101 @@ end # @private def raw_event_handler(event) return unless event.session_event? + + @mutex.synchronize do + @last_cnx_state = event.state - if event.client_invalid? - return unless @reconnect + if event.client_invalid? and @reconnect and not dead_or_dying? + logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" } - @mutex.synchronize do - unless dead_or_dying? # a legitimate shutdown case - - logger.error { "Got event #{event.state_name}, calling reopen(0)! things may be messed up until this works itself out!" } - - # reopen(0) means that we don't want to wait for the connection - # to reach the connected state before returning as we're on the - # event thread. - reopen(0) - end + # reopen(0) means that we don't want to wait for the connection + # to reach the connected state before returning as we're on the + # event thread. + reopen(0) end + + @cond.broadcast # wake anyone waiting for a connection state update end rescue Exception => e logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" } end def closed? - return true if @mutex.synchronize { @cli_state == :closed } + return true if @mutex.synchronize { @cli_state == CLI_CLOSED } super end # are we in running (not-paused) state? def running? - @mutex.synchronize { @cli_state == :running } + @mutex.synchronize { @cli_state == CLI_RUNNING } end # are we in paused state? def paused? - @mutex.synchronize { @cli_state == :paused } + @mutex.synchronize { @cli_state == CLI_PAUSED } end # has shutdown time arrived? def close_requested? - @mutex.synchronize { @cli_state == :close_requested } + @mutex.synchronize { @cli_state == CLI_CLOSE_REQ } end protected # in the threaded version of the client, synchronize access around cnx # so that callers don't wind up with a nil object when we're in the middle # of reopening it def cnx @mutex.synchronize { @cnx } end - # @private + def call_and_check_rc(meth, opts) + if retry_duration = (opts.delete(:retry_duration) || @retry_duration) + begin + super(meth, opts) + rescue Exceptions::Retryable => e + time_to_stop = Time.now + retry_duration + + wait_until_connected_or_dying(retry_duration) + + if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@cli_state != CLI_RUNNING) + raise e + else + retry + end + end + else + super + end + end + + def wait_until_connected_or_dying(timeout) + time_to_stop = Time.now + timeout + + @mutex.synchronize do + while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@cli_state == CLI_RUNNING) + @cond.wait(timeout) + end + + logger.debug { "@last_cnx_state: #{@last_cnx_state}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @cli_state: #{@cli_state.inspect}" } + end + end + def create_connection(*args) ::Zookeeper.new(*args) end def dead_or_dying? - (@cli_state == :close_requested) || (@cli_state == :closed) + (@cli_state == CLI_CLOSE_REQ) || (@cli_state == CLI_CLOSED) + end + + private + def unlocked_connect(opts={}) + return if @cnx + timeout = opts.fetch(:timeout, @connection_timeout) + @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block) end end end end