Sha256: 8f004c9b0aebb2e6bad11e8c00ccb2623233f3b5b5871fd92c45519e527fa7ef

Contents?: true

Size: 1.47 KB

Versions: 2

Compression:

Stored size: 1.47 KB

Contents

# -*- coding: utf-8 -*-
require "spec_helper"

describe "Concurrent consumers sharing a connection" do
  let(:connection) do
    c = Bunny.new(:user => "bunny_gem", :password => "bunny_password", :vhost => "bunny_testbed", :automatic_recovery => false)
    c.start
    c
  end

  after :all do
    connection.close
  end

  def any_not_drained?(qs)
    qs.any? { |q| !q.message_count.zero? }
  end

  context "when publishing thousands of messages over 128K in size" do
    let(:colors) { ["red", "blue", "white"] }

    let(:n) { 32 }
    let(:m) { 1000 }

    it "successfully drain all queues" do
      ch   = connection.create_channel
      body = "абвг"
      x    = ch.topic("bunny.stress.concurrent.consumers.topic", :durable => true)

      chs  = {}
      n.times do |i|
        chs[i] = connection.create_channel
      end
      qs   = []

      n.times do |i|
        t = Thread.new do
          cht = chs[i]

          q = cht.queue("", :exclusive => true)
          q.bind(x.name, :routing_key => colors.sample).subscribe do |delivery_info, meta, payload|
            # no-op
          end
          qs << q
        end
        t.abort_on_exception = true
      end

      sleep 1.0

      5.times do |i|
        m.times do
          x.publish(body, :routing_key => colors.sample)
        end
        puts "Published #{(i + 1) * m} messages..."
      end

      while any_not_drained?(qs)
        sleep 1.0
      end
      puts "Drained all the queues..."

      ch.close
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
bunny-0.9.0.pre9 spec/stress/concurrent_consumers_stress_spec.rb
bunny-0.9.0.pre8 spec/stress/concurrent_consumers_stress_spec.rb