Sha256: e3fe17cb040af7724508f50ad1484a2933d3f6faeaac714a8daff382c245cad3
Contents?: true
Size: 1.33 KB
Versions: 23
Compression:
Stored size: 1.33 KB
Contents
require "thread" module Bunny # Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels # or threads. # # @private class ConsumerWorkPool # # API # attr_reader :size def initialize(size = 1) @size = size @queue = ::Queue.new end def submit(callable = nil, &block) @queue.push(callable || block) end def start @threads = [] @size.times do t = Thread.new(&method(:run_loop)) @threads << t end @running = true end def running? @running end def shutdown @running = false @size.times do submit do |*args| throw :terminate end end end def join @threads.each { |t| t.join } end def pause @running = false @threads.each { |t| t.stop } end def resume @running = true @threads.each { |t| t.run } end def kill @running = false @threads.each { |t| t.kill } end protected def run_loop catch(:terminate) do loop do callable = @queue.pop begin callable.call rescue Exception => e # TODO puts e.class.name puts e.message end end end end end end
Version data entries
23 entries across 23 versions & 1 rubygems
Version | Path |
---|---|
bunny-0.9.1 | lib/bunny/consumer_work_pool.rb |
bunny-0.9.0 | lib/bunny/consumer_work_pool.rb |
bunny-0.9.0.rc2 | lib/bunny/consumer_work_pool.rb |