lib/zk/client/threaded.rb in zk-1.2.0 vs lib/zk/client/threaded.rb in zk-1.3.0

- old
+ new

@@ -108,14 +108,18 @@ # it will not affect the others. # # * see {https://github.com/slyphon/zk/wiki/EventDeliveryModel the wiki} for a # discussion and demonstration of the effect of this setting. # - # @option opts [Fixnum] :timeout how long we will wait for the connection - # to be established. If timeout is nil, we will wait forever: *use - # carefully*. + # @option opts [Fixnum] :timeout used as a default for calls to {#reopen} + # and {#connect} (including the initial default immediate connection) # + # @option opts [true,false] :connect (true) Immediately connect to the + # server. It may be useful to pass false if you wish to do callback + # setup without passing a block. You must then call {#connect} + # explicitly. + # # @yield [self] calls the block with the new instance after the event # handler and threadpool have been set up, but before any connections # have been made. This allows the client to register watchers for # session events like `connected`. You *cannot* perform any other # operations with the client as you will get a NoMethodError (the @@ -128,27 +132,60 @@ super(host, opts) tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE) @threadpool = Threadpool.new(tp_size) - @session_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass? + @connection_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass? @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) - @mutex = Mutex.new + @mutex = Monitor.new @close_requested = false yield self if block_given? - @cnx = create_connection(host, @session_timeout, @event_handler.get_default_watcher_block) + @mutex.synchronize do + connect if opts.fetch(:connect, true) + end end + # @option opts [Fixnum] :timeout how long we will wait for the connection + # to be established. If timeout is nil, we will wait forever: *use + # carefully*. + def connect(opts={}) + @mutex.synchronize do + return if @cnx + timeout = opts.fetch(:timeout, @connection_timeout) + @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block) + end + end + # (see Base#reopen) def reopen(timeout=nil) + # If we've forked, then we can call all sorts of normally dangerous + # stuff because we're the only thread. + if forked? + # ok, just to sanity check here + raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION) + + logger.debug { "#{self.class}##{__method__} reopening everything, fork detected!" } + + @mutex = Monitor.new + @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork() + @event_handler.reopen_after_fork! + @pid = Process.pid + + old_cnx, @cnx = @cnx, nil + old_cnx.close! if old_cnx # && !old_cnx.closed? + else + logger.debug { "#{self.class}##{__method__} not reopening, no fork detected" } + end + @mutex.synchronize { @close_requested = false } - super + connect + state end # (see Base#close!) # # @note We will make our best effort to do the right thing if you call