Sha256: 1b84c6b8057e9b15c3e1090b8dc4c80ae52458fa19400cbd5162c1542dfa2eee
Contents?: true
Size: 1.13 KB
Versions: 1
Compression:
Stored size: 1.13 KB
Contents
require 'lps' class Proco # @private class Dispatcher include Proco::MT::Threaded def initialize proco, thread_pool, block super() @logger, interval, qs, batch, batch_size = proco.options.values_at :logger, :interval, :queue_size, :batch, :batch_size @queue = if batch && batch_size Proco::Queue::BatchQueue.new(qs, batch_size) elsif batch Proco::Queue::MultiQueue.new(qs) else Proco::Queue::SingleQueue.new(qs) end @pool = thread_pool @block = block spawn do future = items = nil LPS.interval(interval).while { future, items = @queue.take future # JRuby bug }.loop do inner_loop future, items end end end def push *items @queue.push(*items) end def exit @queue.invalidate super end private def inner_loop future, items @pool.assign do future.update do begin @block.call items rescue Exception => e error e.to_s # TODO raise end end end future end end#Dispatcher end#Proco
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
proco-0.0.1 | lib/proco/dispatcher.rb |