Sha256: 83807f2de9e7a4a4211ed1b623d145055db2ef879a8c0c22a1826b4528e78eb5

Contents?: true

Size: 1.48 KB

Versions: 3

Compression:

Stored size: 1.48 KB

Contents

module ZK
  # A class that encapsulates the queue + thread that calls a callback.
  # Repsonds to `call` but places call on a queue to be delivered by a thread.
  # You will not have a useful return value from `call` so this is only useful
  # for background processing.
  class ThreadedCallback
    include ZK::Logging

    attr_reader :callback

    def initialize(callback=nil, &blk)
      @callback = callback || blk
      @mutex = Monitor.new
      @queue = Queue.new
      @running = true
      setup_dispatch_thread
    end

    def running?
      @mutex.synchronize { @running }
    end

    # how long to wait on thread shutdown before we return
    def shutdown(timeout=2)
      @mutex.synchronize do
        @running = false
        @queue.push(KILL_TOKEN)
        return unless @thread 
        unless @thread.join(2)
          logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" }
        end
      end
    end

    def call(*args)
      @queue.push(args)
    end

    protected
      def setup_dispatch_thread
        @thread ||= Thread.new do
          while running?
            args = @queue.pop
            break if args == KILL_TOKEN
            begin
              callback.call(*args)
            rescue Exception => e
              logger.error { "error caught in handler for path: #{path.inspect}, interests: #{interests.inspect}" }
              logger.error { e.to_std_format }
            end
          end
        end
      end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
zk-1.2.0 lib/zk/threaded_callback.rb
zk-1.1.1 lib/zk/threaded_callback.rb
zk-1.1.0 lib/zk/threaded_callback.rb