Sha256: cad7414b846a2e43137c3b6947c33bc49a88f8355ec7b107bd7ef9b0c3815c5f
Contents?: true
Size: 1.85 KB
Versions: 5
Compression:
Stored size: 1.85 KB
Contents
module ZK # A class that encapsulates the queue + thread that calls a callback. # Repsonds to `call` but places call on a queue to be delivered by a thread. # You will not have a useful return value from `call` so this is only useful # for background processing. class ThreadedCallback include ZK::Logging attr_reader :callback def initialize(callback=nil, &blk) @callback = callback || blk @mutex = Monitor.new @mutex.synchronize do @running = true reopen_after_fork! end end def running? @mutex.synchronize { @running } end # how long to wait on thread shutdown before we return def shutdown(timeout=2) @mutex.synchronize do @running = false @queue.push(KILL_TOKEN) return unless @thread unless @thread.join(2) logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" } end end end def call(*args) @queue.push(args) end # called after a fork to replace a dead delivery thread # special case, there should be ONLY ONE THREAD RUNNING, # (the one that survived the fork) # # @private def reopen_after_fork! return unless @running return if @thread and @thread.alive? @mutex = Monitor.new @queue = Queue.new @thread = spawn_dispatch_thread() end protected def spawn_dispatch_thread Thread.new do while running? args = @queue.pop break if args == KILL_TOKEN begin callback.call(*args) rescue Exception => e logger.error { "error caught in handler for path: #{path.inspect}, interests: #{interests.inspect}" } logger.error { e.to_std_format } end end end end end end
Version data entries
5 entries across 5 versions & 1 rubygems
Version | Path |
---|---|
zk-1.4.2 | lib/zk/threaded_callback.rb |
zk-1.4.1 | lib/zk/threaded_callback.rb |
zk-1.4.0 | lib/zk/threaded_callback.rb |
zk-1.3.1 | lib/zk/threaded_callback.rb |
zk-1.3.0 | lib/zk/threaded_callback.rb |