Sha256: 1b84c6b8057e9b15c3e1090b8dc4c80ae52458fa19400cbd5162c1542dfa2eee

Contents?: true

Size: 1.13 KB

Versions: 1

Compression:

Stored size: 1.13 KB

Contents

require 'lps'

class Proco
# @private
class Dispatcher
  include Proco::MT::Threaded

  def initialize proco, thread_pool, block
    super()

    @logger, interval, qs, batch, batch_size =
      proco.options.values_at :logger, :interval, :queue_size, :batch, :batch_size
    @queue = if batch && batch_size
               Proco::Queue::BatchQueue.new(qs, batch_size)
             elsif batch
               Proco::Queue::MultiQueue.new(qs)
             else
               Proco::Queue::SingleQueue.new(qs)
             end
    @pool  = thread_pool
    @block = block

    spawn do
      future = items = nil
      LPS.interval(interval).while {
        future, items = @queue.take
        future # JRuby bug
      }.loop do
        inner_loop future, items
      end
    end
  end

  def push *items
    @queue.push(*items)
  end

  def exit
    @queue.invalidate
    super
  end

private
  def inner_loop future, items
    @pool.assign do
      future.update do
        begin
          @block.call items
        rescue Exception => e
          error e.to_s # TODO
          raise
        end
      end
    end

    future
  end
end#Dispatcher
end#Proco

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
proco-0.0.1 lib/proco/dispatcher.rb