Sha256: 088b1128182168057c442c8beeb8cbb29f23378328559b8f87826662918e92ef
Contents?: true
Size: 1.08 KB
Versions: 2
Compression:
Stored size: 1.08 KB
Contents
require 'celluloid' require 'celluloid/io' require 'celluloid/autostart' require 'bunny' class Worker include Celluloid::IO finalizer :shutdown def initialize(conn) @ch = conn.create_channel q = @ch.queue('task_queue', :durable => true) q.subscribe manual_ack: true, &method(:on_event) p 'worker accepting connections' end def on_event(delivery_info, properties, body) p "#{Time.now.strftime('%FT %T.%L')} started work: #{body}" sleep(10) p "#{Time.now.strftime('%FT %T.%L')} completed work: #{body}" @ch.ack(delivery_info.delivery_tag) end def shutdown p 'trying to shutdown...' @ch.close p 'hey shutdown' end end class Consumer def initialize(options = {}) @size = (ENV['POOL'] || 2).to_i @connection = Bunny.new(options) @connection.start @workers = @size.times.map{ Worker.new(@connection) } end def close_connection futures = @workers.map { |w| w.future(:finalize) } @connection.close if futures.all? end end begin @consumer = Consumer.new rescue Interrupt => _ @consumer.close_connection end sleep
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
asynk-0.0.2 | consumer.rb |
asynk-0.0.1 | consumer.rb |