lib/zk/client/threaded.rb in zk-1.2.0 vs lib/zk/client/threaded.rb in zk-1.3.0
- old
+ new
@@ -108,14 +108,18 @@
# it will not affect the others.
#
# * see {https://github.com/slyphon/zk/wiki/EventDeliveryModel the wiki} for a
# discussion and demonstration of the effect of this setting.
#
- # @option opts [Fixnum] :timeout how long we will wait for the connection
- # to be established. If timeout is nil, we will wait forever: *use
- # carefully*.
+ # @option opts [Fixnum] :timeout used as a default for calls to {#reopen}
+ # and {#connect} (including the initial default immediate connection)
#
+ # @option opts [true,false] :connect (true) Immediately connect to the
+ # server. It may be useful to pass false if you wish to do callback
+ # setup without passing a block. You must then call {#connect}
+ # explicitly.
+ #
# @yield [self] calls the block with the new instance after the event
# handler and threadpool have been set up, but before any connections
# have been made. This allows the client to register watchers for
# session events like `connected`. You *cannot* perform any other
# operations with the client as you will get a NoMethodError (the
@@ -128,27 +132,60 @@
super(host, opts)
tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE)
@threadpool = Threadpool.new(tp_size)
- @session_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass?
+ @connection_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass?
@event_handler = EventHandler.new(self, opts)
@reconnect = opts.fetch(:reconnect, true)
- @mutex = Mutex.new
+ @mutex = Monitor.new
@close_requested = false
yield self if block_given?
- @cnx = create_connection(host, @session_timeout, @event_handler.get_default_watcher_block)
+ @mutex.synchronize do
+ connect if opts.fetch(:connect, true)
+ end
end
+ # @option opts [Fixnum] :timeout how long we will wait for the connection
+ # to be established. If timeout is nil, we will wait forever: *use
+ # carefully*.
+ def connect(opts={})
+ @mutex.synchronize do
+ return if @cnx
+ timeout = opts.fetch(:timeout, @connection_timeout)
+ @cnx = create_connection(@host, timeout, @event_handler.get_default_watcher_block)
+ end
+ end
+
# (see Base#reopen)
def reopen(timeout=nil)
+ # If we've forked, then we can call all sorts of normally dangerous
+ # stuff because we're the only thread.
+ if forked?
+ # ok, just to sanity check here
+ raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION)
+
+ logger.debug { "#{self.class}##{__method__} reopening everything, fork detected!" }
+
+ @mutex = Monitor.new
+ @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork()
+ @event_handler.reopen_after_fork!
+ @pid = Process.pid
+
+ old_cnx, @cnx = @cnx, nil
+ old_cnx.close! if old_cnx # && !old_cnx.closed?
+ else
+ logger.debug { "#{self.class}##{__method__} not reopening, no fork detected" }
+ end
+
@mutex.synchronize { @close_requested = false }
- super
+ connect
+ state
end
# (see Base#close!)
#
# @note We will make our best effort to do the right thing if you call