Sha256: cad7414b846a2e43137c3b6947c33bc49a88f8355ec7b107bd7ef9b0c3815c5f

Contents?: true

Size: 1.85 KB

Versions: 5

Compression:

Stored size: 1.85 KB

Contents

module ZK
  # A class that encapsulates the queue + thread that calls a callback.
  # Repsonds to `call` but places call on a queue to be delivered by a thread.
  # You will not have a useful return value from `call` so this is only useful
  # for background processing.
  class ThreadedCallback
    include ZK::Logging

    attr_reader :callback

    def initialize(callback=nil, &blk)
      @callback = callback || blk
      @mutex = Monitor.new

      @mutex.synchronize do
        @running = true
        reopen_after_fork!
      end
    end

    def running?
      @mutex.synchronize { @running }
    end

    # how long to wait on thread shutdown before we return
    def shutdown(timeout=2)
      @mutex.synchronize do
        @running = false
        @queue.push(KILL_TOKEN)
        return unless @thread 
        unless @thread.join(2)
          logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" }
        end
      end
    end

    def call(*args)
      @queue.push(args)
    end

    # called after a fork to replace a dead delivery thread
    # special case, there should be ONLY ONE THREAD RUNNING, 
    # (the one that survived the fork)
    #
    # @private
    def reopen_after_fork!
      return unless @running
      return if @thread and @thread.alive?
      @mutex = Monitor.new
      @queue = Queue.new
      @thread = spawn_dispatch_thread()
    end

    protected
      def spawn_dispatch_thread
        Thread.new do
          while running?
            args = @queue.pop
            break if args == KILL_TOKEN
            begin
              callback.call(*args)
            rescue Exception => e
              logger.error { "error caught in handler for path: #{path.inspect}, interests: #{interests.inspect}" }
              logger.error { e.to_std_format }
            end
          end
        end
      end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
zk-1.4.2 lib/zk/threaded_callback.rb
zk-1.4.1 lib/zk/threaded_callback.rb
zk-1.4.0 lib/zk/threaded_callback.rb
zk-1.3.1 lib/zk/threaded_callback.rb
zk-1.3.0 lib/zk/threaded_callback.rb