Sha256: 76e0101e8b1cb283e3828562273e11affa716f0e906f8936d93a9749a63fcaf4
Contents?: true
Size: 1.16 KB
Versions: 1
Compression:
Stored size: 1.16 KB
Contents
require 'lps' class Proco # @private class Dispatcher include Proco::MT::Threaded def initialize proco, thread_pool, block super() @logger, interval, qs, batch, batch_size = proco.options.values_at :logger, :interval, :queue_size, :batch, :batch_size @queue = if batch && batch_size Proco::Queue::BatchQueue.new(qs, batch_size, interval) elsif batch Proco::Queue::MultiQueue.new(qs, interval) else Proco::Queue::SingleQueue.new(qs, interval) end @pool = thread_pool @block = block spawn do future = items = nil LPS.interval(interval).while { future, items = @queue.take future # JRuby bug }.loop do inner_loop future, items end end end def push *items @queue.push(*items) end def exit @queue.invalidate super end private def inner_loop future, items @pool.assign do future.update do begin @block.call items rescue Exception => e error e.to_s # TODO raise end end end future end end#Dispatcher end#Proco
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
proco-0.0.2 | lib/proco/dispatcher.rb |