Sha256: 76e0101e8b1cb283e3828562273e11affa716f0e906f8936d93a9749a63fcaf4

Contents?: true

Size: 1.16 KB

Versions: 1

Compression:

Stored size: 1.16 KB

Contents

require 'lps'

class Proco
# @private
class Dispatcher
  include Proco::MT::Threaded

  def initialize proco, thread_pool, block
    super()

    @logger, interval, qs, batch, batch_size =
      proco.options.values_at :logger, :interval, :queue_size, :batch, :batch_size
    @queue = if batch && batch_size
               Proco::Queue::BatchQueue.new(qs, batch_size, interval)
             elsif batch
               Proco::Queue::MultiQueue.new(qs, interval)
             else
               Proco::Queue::SingleQueue.new(qs, interval)
             end
    @pool  = thread_pool
    @block = block

    spawn do
      future = items = nil
      LPS.interval(interval).while {
        future, items = @queue.take
        future # JRuby bug
      }.loop do
        inner_loop future, items
      end
    end
  end

  def push *items
    @queue.push(*items)
  end

  def exit
    @queue.invalidate
    super
  end

private
  def inner_loop future, items
    @pool.assign do
      future.update do
        begin
          @block.call items
        rescue Exception => e
          error e.to_s # TODO
          raise
        end
      end
    end

    future
  end
end#Dispatcher
end#Proco

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
proco-0.0.2 lib/proco/dispatcher.rb