Sha256: 088b1128182168057c442c8beeb8cbb29f23378328559b8f87826662918e92ef

Contents?: true

Size: 1.08 KB

Versions: 2

Compression:

Stored size: 1.08 KB

Contents

require 'celluloid'
require 'celluloid/io'
require 'celluloid/autostart'
require 'bunny'

class Worker
  include Celluloid::IO
  finalizer :shutdown

  def initialize(conn)
    @ch = conn.create_channel
    q = @ch.queue('task_queue', :durable => true)
    q.subscribe manual_ack: true, &method(:on_event)
    p 'worker accepting connections'
  end

  def on_event(delivery_info, properties, body)
    p "#{Time.now.strftime('%FT %T.%L')} started work: #{body}"
    sleep(10)
    p "#{Time.now.strftime('%FT %T.%L')} completed work: #{body}"
    @ch.ack(delivery_info.delivery_tag)
  end

  def shutdown
    p 'trying to shutdown...'
    @ch.close
    p 'hey shutdown'
  end
end

class Consumer
  def initialize(options = {})
    @size = (ENV['POOL'] || 2).to_i
    @connection = Bunny.new(options)
    @connection.start
    @workers = @size.times.map{ Worker.new(@connection) }
  end

  def close_connection
    futures = @workers.map { |w| w.future(:finalize) }
    @connection.close if futures.all?
  end
end

begin
  @consumer = Consumer.new
rescue Interrupt => _
  @consumer.close_connection
end

sleep

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
asynk-0.0.2 consumer.rb
asynk-0.0.1 consumer.rb