Sha256: 7864401626fce297d501547c17c139c280715235b275062e0cff649a94ef3add

Contents?: true

Size: 1.94 KB

Versions: 73

Compression:

Stored size: 1.94 KB

Contents

# -*- coding: utf-8 -*-
require "spec_helper"

unless ENV["CI"]
  require "bunny/concurrent/condition"
  require "bunny/test_kit"

  describe "Long running [relatively to heartbeat interval] consumer that never publishes" do
    let(:connection) do
      c = Bunny.new(:user => "bunny_gem", :password => "bunny_password", :vhost => "bunny_testbed", :automatic_recovery => false, :heartbeat_interval => 6)
      c.start
      c
    end

    after :all do
      connection.close
    end

    let(:target) { 512 * 1024 * 1024 }
    let(:queue)  { "bunny.stress.long_running_consumer.#{Time.now.to_i}" }

    let(:rate) { 50 }
    let(:s)    { 4.0 }



    it "does not skip heartbeats" do
      finished = Bunny::Concurrent::Condition.new

      ct = Thread.new do
        t  = 0
        ch = connection.create_channel(nil, 6)
        begin
          q  = ch.queue(queue, :exclusive => true)

          q.bind("amq.fanout").subscribe do |_, _, payload|
            t += payload.bytesize

            if t >= target
              puts "Hit the target, done with the test..."

              finished.notify_all
            else
              puts "Consumed #{(t.to_f / target.to_f).round(3) * 100}% of data"
            end
          end
        rescue Interrupt => e
          ch.maybe_kill_consumer_work_pool!
          ch.close
        end
      end
      ct.abort_on_exception = true

      pt = Thread.new do
        t  = 0
        ch = connection.create_channel
        begin
          x  = ch.fanout("amq.fanout")

          loop do
            break if t >= target

            rate.times do |i|
              msg = Bunny::TestKit.message_in_kb(96, 8192, i)
              x.publish(msg)
              t += msg.bytesize
            end

            sleep (s * rand)
          end
        rescue Interrupt => e
          ch.close
        end
      end
      pt.abort_on_exception = true

      finished.wait

      ct.raise Interrupt.new
      pt.raise Interrupt.new
    end
  end
end

Version data entries

73 entries across 73 versions & 1 rubygems

Version Path
bunny-1.7.1 spec/stress/long_running_consumer_spec.rb
bunny-1.7.0 spec/stress/long_running_consumer_spec.rb
bunny-1.6.3 spec/stress/long_running_consumer_spec.rb
bunny-1.6.2 spec/stress/long_running_consumer_spec.rb
bunny-1.6.1 spec/stress/long_running_consumer_spec.rb
bunny-1.6.0 spec/stress/long_running_consumer_spec.rb
bunny-1.5.1 spec/stress/long_running_consumer_spec.rb
bunny-1.6.0.rc2 spec/stress/long_running_consumer_spec.rb
bunny-1.6.0.rc1 spec/stress/long_running_consumer_spec.rb
bunny-1.6.0.pre1 spec/stress/long_running_consumer_spec.rb
bunny-1.5.0 spec/stress/long_running_consumer_spec.rb
bunny-1.5.0.pre2 spec/stress/long_running_consumer_spec.rb
bunny-1.5.0.pre1 spec/stress/long_running_consumer_spec.rb
bunny-1.4.1 spec/stress/long_running_consumer_spec.rb
bunny-1.4.0 spec/stress/long_running_consumer_spec.rb
bunny-1.3.1 spec/stress/long_running_consumer_spec.rb
bunny-1.3.0 spec/stress/long_running_consumer_spec.rb
bunny-1.2.2 spec/stress/long_running_consumer_spec.rb
bunny-1.2.1 spec/stress/long_running_consumer_spec.rb
bunny-1.2.0 spec/stress/long_running_consumer_spec.rb