Sha256: a5ab8e396311ebf7099f9e6167213227c64e8031c84029601e87b28d3ab22235

Contents?: true

Size: 1.64 KB

Versions: 73

Compression:

Stored size: 1.64 KB

Contents

# -*- coding: utf-8 -*-
require "spec_helper"

unless ENV["CI"]
  describe "Concurrent consumers sharing a connection" do
    let(:connection) do
      c = Bunny.new(:user => "bunny_gem", :password => "bunny_password", :vhost => "bunny_testbed",
                    :automatic_recovery => false, :continuation_timeout => 6000)
      c.start
      c
    end

    after :all do
      connection.close
    end

    def any_not_drained?(qs)
      qs.any? { |q| !q.message_count.zero? }
    end

    context "when publishing thousands of messages over 128K in size" do
      let(:colors) { ["red", "blue", "white"] }

      let(:n) { 32 }
      let(:m) { 1000 }

      it "successfully drain all queues" do
        ch   = connection.create_channel
        body = "абвг"
        x    = ch.topic("bunny.stress.concurrent.consumers.topic", :durable => true)

        chs  = {}
        n.times do |i|
          chs[i] = connection.create_channel
        end
        qs   = []

        n.times do |i|
          t = Thread.new do
            cht = chs[i]

            q = cht.queue("", :exclusive => true)
            q.bind(x.name, :routing_key => colors.sample).subscribe do |delivery_info, meta, payload|
              # no-op
            end
            qs << q
          end
          t.abort_on_exception = true
        end

        sleep 1.0

        5.times do |i|
          m.times do
            x.publish(body, :routing_key => colors.sample)
          end
          puts "Published #{(i + 1) * m} messages..."
        end

        while any_not_drained?(qs)
          sleep 1.0
        end
        puts "Drained all the queues..."

        ch.close
      end
    end
  end
end

Version data entries

73 entries across 73 versions & 1 rubygems

Version Path
bunny-1.7.1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.7.0 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.3 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.2 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.0 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.5.1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.0.rc2 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.0.rc1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.6.0.pre1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.5.0 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.5.0.pre2 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.5.0.pre1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.4.1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.4.0 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.3.1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.3.0 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.2.2 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.2.1 spec/stress/concurrent_consumers_stress_spec.rb
bunny-1.2.0 spec/stress/concurrent_consumers_stress_spec.rb