lib/zk/threaded_callback.rb in zk-1.4.2 vs lib/zk/threaded_callback.rb in zk-1.5.0
- old
+ new
@@ -3,69 +3,153 @@
# Repsonds to `call` but places call on a queue to be delivered by a thread.
# You will not have a useful return value from `call` so this is only useful
# for background processing.
class ThreadedCallback
include ZK::Logging
+ include ZK::Exceptions
attr_reader :callback
def initialize(callback=nil, &blk)
@callback = callback || blk
- @mutex = Monitor.new
- @mutex.synchronize do
- @running = true
- reopen_after_fork!
- end
+ @state = :paused
+ reopen_after_fork!
end
def running?
- @mutex.synchronize { @running }
+ @mutex.synchronize { @state == :running }
end
+ # @private
+ def alive?
+ @thread && @thread.alive?
+ end
+
# how long to wait on thread shutdown before we return
- def shutdown(timeout=2)
- @mutex.synchronize do
- @running = false
- @queue.push(KILL_TOKEN)
- return unless @thread
- unless @thread.join(2)
- logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" }
- end
+ def shutdown(timeout=5)
+# logger.debug { "#{self.class}##{__method__}" }
+
+ @mutex.lock
+ begin
+ return true if @state == :shutdown
+
+ @state = :shutdown
+ @cond.broadcast
+ ensure
+ @mutex.unlock rescue nil
end
+
+ return true unless @thread
+
+ unless @thread.join(timeout) == @thread
+ logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" }
+ return false
+ end
+
+ true
end
def call(*args)
- @queue.push(args)
+ @mutex.lock
+ begin
+ @array << args
+ @cond.broadcast
+ ensure
+ @mutex.unlock rescue nil
+ end
end
# called after a fork to replace a dead delivery thread
# special case, there should be ONLY ONE THREAD RUNNING,
# (the one that survived the fork)
#
# @private
def reopen_after_fork!
- return unless @running
- return if @thread and @thread.alive?
- @mutex = Monitor.new
- @queue = Queue.new
- @thread = spawn_dispatch_thread()
+# logger.debug { "#{self.class}##{__method__}" }
+
+ unless @state == :paused
+ raise InvalidStateError, "state should have been :paused, not: #{@state.inspect}"
+ end
+
+ if @thread
+ raise InvalidStateError, "WTF?! did you fork in a callback? my thread was alive!" if @thread.alive?
+ @thread = nil
+ end
+
+ @mutex = Mutex.new
+ @cond = ConditionVariable.new
+ @array = []
+ resume_after_fork_in_parent
end
+ # shuts down the event delivery thread, but keeps the queue so we can continue
+ # delivering queued events when {#resume_after_fork_in_parent} is called
+ def pause_before_fork_in_parent
+ @mutex.lock
+ begin
+ raise InvalidStateError, "@state was not :running, @state: #{@state.inspect}" if @state != :running
+ return if @state == :paused
+
+ @state = :paused
+ @cond.broadcast
+ ensure
+ @mutex.unlock rescue nil
+ end
+
+ return unless @thread
+
+ @thread.join
+ @thread = nil
+ end
+
+ def resume_after_fork_in_parent
+ @mutex.lock
+ begin
+ raise InvalidStateError, "@state was not :paused, @state: #{@state.inspect}" if @state != :paused
+ raise InvalidStateError, "@thread was not nil! #{@thread.inspect}" if @thread
+
+ @state = :running
+# logger.debug { "#{self.class}##{__method__} spawning dispatch thread" }
+ spawn_dispatch_thread
+ ensure
+ @mutex.unlock rescue nil
+ end
+ end
+
protected
+ # intentionally *not* synchronized
def spawn_dispatch_thread
- Thread.new do
- while running?
- args = @queue.pop
- break if args == KILL_TOKEN
- begin
- callback.call(*args)
- rescue Exception => e
- logger.error { "error caught in handler for path: #{path.inspect}, interests: #{interests.inspect}" }
- logger.error { e.to_std_format }
+ @thread = Thread.new(&method(:dispatch_thread_body))
+ end
+
+ def dispatch_thread_body
+ Thread.current.abort_on_exception = true
+ while true
+ args = nil
+
+ @mutex.lock
+ begin
+ @cond.wait(@mutex) while @array.empty? and @state == :running
+
+ if @state != :running
+# logger.warn { "ThreadedCallback, state is #{@state.inspect}, returning" }
+ return
end
+
+ args = @array.shift
+ ensure
+ @mutex.unlock rescue nil
end
+
+ begin
+ callback.call(*args)
+ rescue Exception => e
+ logger.error { e.to_std_format }
+ end
end
+# ensure
+# logger.debug { "#{self.class}##{__method__} returning" }
end
end
end