lib/zk/client/threaded.rb in zk-1.5.3 vs lib/zk/client/threaded.rb in zk-1.6.0

- old
+ new

@@ -345,11 +345,11 @@ end on_tpool ? shutdown_thread : shutdown_thread.join(30) end - # {see Base#close} + # {see ZK::Client::Base#close} def close super subs, @fork_subs = @fork_subs, [] subs.each(&:unsubscribe) nil @@ -363,10 +363,37 @@ # (see Threadpool#on_exception) def on_exception(&blk) @threadpool.on_exception(&blk) end + def closed? + return true if @mutex.synchronize { @client_state == CLOSED } + super + end + + # this is where the :on option is implemented for {Base#create} + def create(path, *args) + opts = args.extract_options! + + or_opt = opts.delete(:or) + args << opts + + if or_opt + hash = parse_create_args(path, *args) + + raise ArgumentError, "valid options for :or are nil or :set, not #{or_opt.inspect}" unless or_opt == :set + raise ArgumentError, "you cannot create an ephemeral node when using the :or option" if hash[:ephemeral] + raise ArgumentError, "you cannot create an sequence node when using the :or option" if hash[:sequence] + + mkdir_p(path, :data => hash[:data]) + path + else + # ok, none of our business, hand it up to mangement + super(path, *args) + end + end + # @private def raw_event_handler(event) return unless event.session_event? @mutex.synchronize do @@ -376,47 +403,51 @@ end rescue Exception => e logger.error { "BUG: Exception caught in raw_event_handler: #{e.to_std_format}" } end - def closed? - return true if @mutex.synchronize { @client_state == CLOSED } - super - end - - # are we in running (not-paused) state? # @private - def running? - @mutex.synchronize { @client_state == RUNNING } - end - - # are we in paused state? - # @private - def paused? - @mutex.synchronize { @client_state == PAUSED } - end - - # has shutdown time arrived? - # @private - def close_requested? - @mutex.synchronize { @client_state == CLOSE_REQ } - end - - # @private def wait_until_connected_or_dying(timeout) time_to_stop = Time.now + timeout @mutex.synchronize do - while (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) && (Time.now < time_to_stop) && (@client_state == RUNNING) - @cond.wait(timeout) + while true + now = Time.now + break if (@last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE) || (now > time_to_stop) || (@client_state != RUNNING) + deadline = time_to_stop.to_f - now.to_f + @cond.wait(deadline) end - logger.debug { "@last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" } + logger.debug { "#{__method__} @last_cnx_state: #{@last_cnx_state.inspect}, time_left? #{Time.now.to_f < time_to_stop.to_f}, @client_state: #{@client_state.inspect}" } end end + # @private + def client_state + @mutex.synchronize { @client_state } + end + private + # are we in running (not-paused) state? + def running? + @client_state == RUNNING + end + + # are we in paused state? + def paused? + @client_state == PAUSED + end + + # has shutdown time arrived? + def close_requested? + @client_state == CLOSE_REQ + end + + def dead_or_dying? + (@client_state == CLOSE_REQ) || (@client_state == CLOSED) + end + # this is just here so we can see it in stack traces def reopen_after_session_expired reopen end @@ -431,15 +462,15 @@ Thread.current[:name] = 'reconnect' while @reconnect # too clever? @mutex.synchronize do # either we havne't seen a valid session update from this # connection yet, or we're doing fine, so just wait - @cond.wait_while { !seen_session_state_event? or (valid_session_state? and (@client_state == RUNNING)) } + @cond.wait_while { !seen_session_state_event? or (valid_session_state? and running?) } # we've entered into a non-running state, so we exit # note: need to restart this thread after a fork in parent - if @client_state != RUNNING + unless running? logger.debug { "session failure watcher thread exiting, @client_state: #{@client_state}" } return end # if we know that this session was valid once and it has now @@ -455,12 +486,10 @@ logger.debug { "reopen returned: #{rv.inspect}" } end end end - ensure - logger.debug { "reconnect thread exiting" } end def join_and_clear_reconnect_thread return unless @reconnect_thread begin @@ -486,11 +515,11 @@ rescue Exceptions::Retryable => e time_to_stop = Time.now + retry_duration wait_until_connected_or_dying(retry_duration) - if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || (@client_state != RUNNING) + if (@last_cnx_state != Zookeeper::ZOO_CONNECTED_STATE) || (Time.now > time_to_stop) || !running? raise e else retry end end @@ -515,13 +544,9 @@ @last_cnx_state and (@last_cnx_state >= 0) end def create_connection(*args) ::Zookeeper.new(*args) - end - - def dead_or_dying? - (@client_state == CLOSE_REQ) || (@client_state == CLOSED) end def unlocked_connect(opts={}) return if @cnx timeout = opts.fetch(:timeout, @connection_timeout)