Sha256: 7f21d84a0ec224d83b2ba667ca74b85217e16980819baf4538c7afc7648a5d82

Contents?: true

Size: 1.66 KB

Versions: 2

Compression:

Stored size: 1.66 KB

Contents

class FluQ::Worker
  include Celluloid
  include FluQ::Mixins::Loggable
  finalizer :finalize

  attr_reader :prefix, :handlers

  # @param [Array<Class,Array>] handlers handler builders
  def initialize(prefix, handlers = [])
    @prefix   = prefix
    @handlers = []
    @observer = observe

    handlers.each do |klass, *args|
      add klass, *args
    end
  end

  # @param [Array<FluQ::Event>] events to process
  def process(events)
    events.freeze # Freeze events, don't allow individual handlers to modify them
    handlers.each do |handler|
      on_events(handler, Time.now, events)
    end
    true
  end

  # Adds a handler
  # @param [Class<FluQ::Handler::Base>] klass handler class
  # @param [multiple] args handler initialize arguments
  def add(klass, *args)
    handler = klass.new(*args)
    handlers.push handler
    handler
  end

  protected

    def finalize
      @observer.kill if @observer
    end

    def on_events(handler, start, events)
      matching = handler.filter(events)
      ::Timeout::timeout handler.config[:timeout] do
        handler.on_events(matching)
        logger.info { "#{prefix}:#{handler.name} #{matching.size}/#{events.size} events in #{((Time.now - start) * 1000).round}ms" }
      end unless matching.empty?
    end

  private

    def next_timers
      handlers.map do |h|
        h.timers unless h.timers.empty?
      end.compact.sort_by(&:wait_interval)[0]
    end

    def observe
      parent = Thread.current
      Thread.new do
        loop do
          begin
            timers = next_timers
            timers ? timers.wait : sleep(0.1)
          rescue => e
            parent.raise(e)
          end
        end
      end
    end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluq-0.8.1 lib/fluq/worker.rb
fluq-0.8.0 lib/fluq/worker.rb