#
# Copyright (c) 2009-2012 RightScale Inc
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

require 'json'

require File.expand_path(File.join(File.dirname(__FILE__), '..', 'spec_helper'))
require File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'lib', 'right_amqp'))

class PushMock; end
class RequestMock; end

describe RightAMQP::HABrokerClient do

  include FlexMock::ArgumentTypes
  include RightAMQP::SpecHelper

  before(:each) do
    setup_logger
    @exceptions = RightSupport::Stats::Exceptions
    @non_deliveries = RightSupport::Stats::Activity
    @message = "message"
    @packet = flexmock("packet", :class => RequestMock, :to_s => true, :version => [12, 12]).by_default
    @serializer = flexmock("serializer")
    @serializer.should_receive(:dump).and_return(@message).by_default
    @serializer.should_receive(:load).with(@message).and_return(@packet).by_default
  end

  describe "Context" do

    before(:each) do
      @packet1 = flexmock("packet1", :class => RequestMock, :name => "request", :type => "type1",
                          :from => "from1", :token => "token1", :one_way => false)
      @packet2 = flexmock("packet2", :class => FlexMock, :name => "flexmock")
      @brokers = ["broker"]
      @options = {:option => "option"}
    end

    it "should initialize context" do
      context = RightAMQP::HABrokerClient::Context.new(@packet1, @options, @brokers)
      context.name.should == "request"
      context.type.should == "type1"
      context.from.should == "from1"
      context.token.should == "token1"
      context.one_way.should be_false
      context.options.should == @options
      context.brokers.should == @brokers
      context.failed.should == []
    end

    it "should treat type, from, token, and one_way as optional members of packet but default one_way to true" do
      context = RightAMQP::HABrokerClient::Context.new(@packet2, @options, @brokers)
      context.name.should == "flexmock"
      context.type.should be_nil
      context.from.should be_nil
      context.token.should be_nil
      context.one_way.should be_true
      context.options.should == @options
      context.brokers.should == @brokers
      context.failed.should == []
    end

    it "should store identity of failed brokers" do
      context = RightAMQP::HABrokerClient::Context.new(@packet1, @options, @brokers)
      context.record_failure("rs-broker-1-1")
      context.record_failure("rs-broker-2-2")
      context.failed.should == ["rs-broker-1-1", "rs-broker-2-2"]
    end

  end

  describe "Caching" do

    require 'digest/md5'

    before(:each) do
      @now = Time.at(1000000)
      @max_age = RightAMQP::HABrokerClient::Published::MAX_AGE
      flexmock(Time).should_receive(:now).and_return(@now).by_default
      @published = RightAMQP::HABrokerClient::Published.new
      @message1 = JSON.dump({:data => "a message"})
      @key1 = Digest::MD5.hexdigest(@message1)
      @message2 = JSON.dump({:data => "another message"})
      @key2 = Digest::MD5.hexdigest(@message2)
      @message3 = JSON.dump({:data => "yet another message"})
      @key3 = Digest::MD5.hexdigest(@message3)
      @packet1 = flexmock("packet1", :class => RequestMock, :name => "request", :type => "type1",
                          :from => "from1", :token => "token1", :one_way => false)
      @packet2 = flexmock("packet2", :class => RequestMock, :name => "request", :type => "type2",
                          :from => "from2", :token => "token2", :one_way => false)
      @packet3 = flexmock("packet3", :class => PushMock, :name => "push", :type => "type3",
                          :from => "from3", :token => "token3", :one_way => true)
      @brokers = ["broker"]
      @options = {:option => "option"}
      @context1 = RightAMQP::HABrokerClient::Context.new(@packet1, @options, @brokers)
      @context2 = RightAMQP::HABrokerClient::Context.new(@packet2, @options, @brokers)
      @context3 = RightAMQP::HABrokerClient::Context.new(@packet3, @options, @brokers)
    end

    it "should use message signature as cache hash key if it has one" do
      @published.identify(@message1).should == @key1
      @published.identify(@message2).should == @key2
      @published.identify(@message3).should == @key3
    end

    it "should store message info" do
      @published.store(@message1, @context1)
      @published.instance_variable_get(:@cache)[@key1].should == [@now.to_i, @context1]
      @published.instance_variable_get(:@lru).should == [@key1]
    end

    it "should update timestamp and lru list when store to existing entry" do
      @published.store(@message1, @context1)
      @published.instance_variable_get(:@cache)[@key1].should == [@now.to_i, @context1]
      @published.instance_variable_get(:@lru).should == [@key1]
      @published.store(@message2, @context2)
      @published.instance_variable_get(:@lru).should == [@key1, @key2]
      flexmock(Time).should_receive(:now).and_return(Time.at(@now + @max_age - 1))
      @published.store(@message1, @context1)
      @published.instance_variable_get(:@cache)[@key1].should == [(@now + @max_age - 1).to_i, @context1]
      @published.instance_variable_get(:@lru).should == [@key2, @key1]
    end

    it "should remove old cache entries when store new one" do
      @published.store(@message1, @context1)
      @published.store(@message2, @context2)
      (@published.instance_variable_get(:@cache).keys - [@key1, @key2]).should == []
      @published.instance_variable_get(:@lru).should == [@key1, @key2]
      flexmock(Time).should_receive(:now).and_return(Time.at(@now + @max_age + 1))
      @published.store(@message3, @context3)
      @published.instance_variable_get(:@cache).keys.should == [@key3]
      @published.instance_variable_get(:@lru).should == [@key3]
    end

    it "should fetch message info and make it the most recently used" do
      @published.store(@message1, @context1)
      @published.store(@message2, @context2)
      @published.instance_variable_get(:@lru).should == [@key1, @key2]
      @published.fetch(@message1).should == @context1
      @published.instance_variable_get(:@lru).should == [@key2, @key1]
    end

    it "should fetch empty hash if entry not found" do
      @published.fetch(@message1).should be_nil
      @published.store(@message1, @context1)
      @published.fetch(@message1).should_not be_nil
      @published.fetch(@message2).should be_nil
    end

  end # Published

  context "when initializing" do

    before(:each) do
      @identity = "rs-broker-localhost-5672"
      @address = {:host => "localhost", :port => 5672, :index => 0}
      @broker = flexmock("broker_client", :identity => @identity, :usable? => true)
      @broker.should_receive(:update_status).by_default
      flexmock(RightAMQP::BrokerClient).should_receive(:new).and_return(@broker).by_default
    end

    it "should create a broker client for default host and port" do
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity, @address, @serializer,
              @exceptions, @non_deliveries, Hash, nil).and_return(@broker).once
      ha = RightAMQP::HABrokerClient.new(@serializer)
      ha.brokers.should == [@broker]
    end

    it "should create broker clients for specified hosts and ports and assign index in order of creation" do
      address1 = {:host => "first", :port => 5672, :index => 0}
      broker1 = flexmock("broker_client1", :identity => "rs-broker-first-5672", :usable? => true)
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with("rs-broker-first-5672", address1, @serializer,
              @exceptions, @non_deliveries, Hash, nil).and_return(broker1).once
      address2 = {:host => "second", :port => 5672, :index => 1}
      broker2 = flexmock("broker_client2", :identity => "rs-broker-second-5672", :usable? => true)
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with("rs-broker-second-5672", address2, @serializer,
              @exceptions, @non_deliveries, Hash, nil).and_return(broker2).once
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second", :port => 5672)
      ha.brokers.should == [broker1, broker2]
    end

    it "should setup to receive status updates from each broker client" do
      broker = flexmock("broker_client", :identity => "rs-broker-first-5672")
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with("rs-broker-first-5672", Hash, @serializer,
              @exceptions, @non_deliveries, on { |arg| arg[:update_status_callback].is_a?(Proc) }, nil).and_return(broker).once
      RightAMQP::HABrokerClient.new(@serializer, :host => "first", :port => 5672)
    end

    it "should setup to receive returned messages from each broker client" do
      broker = flexmock("broker_client", :identity => "rs-broker-first-5672")
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with("rs-broker-first-5672", Hash, @serializer,
              @exceptions, @non_deliveries, on { |arg| arg[:return_message_callback].is_a?(Proc) }, nil).and_return(broker).once
      RightAMQP::HABrokerClient.new(@serializer, :host => "first", :port => 5672)
    end

  end # when initializing

  context "when parsing user_data" do

    it "should extra host list from RS_rn_url and RS_rn_host" do
      RightAMQP::HABrokerClient.parse_user_data("RS_rn_url=rs@first/right_net&RS_rn_host=:0,second:1").should ==
        ["first:0,second:1", nil]
    end

    it "should extra port list from RS_rn_port" do
      RightAMQP::HABrokerClient.parse_user_data("RS_rn_url=rs@host/right_net&RS_rn_host=:1,host:0&RS_rn_port=5673:1,5672:0").should ==
        ["host:1,host:0", "5673:1,5672:0"]
    end

    it "should raise an exception if there is no user data" do
      lambda { RightAMQP::HABrokerClient.parse_user_data(nil) }.should raise_error(RightAMQP::HABrokerClient::NoUserData)
      lambda { RightAMQP::HABrokerClient.parse_user_data("") }.should raise_error(RightAMQP::HABrokerClient::NoUserData)
    end

    it "should raise an exception if there are no broker hosts defined in the data" do
      lambda { RightAMQP::HABrokerClient.parse_user_data("blah") }.should raise_error(RightAMQP::HABrokerClient::NoBrokerHosts)
    end

    it "should translate old host name to standard form" do
      RightAMQP::HABrokerClient.parse_user_data("RS_rn_url=rs@broker.rightscale.com/right_net").should ==
        ["broker1-1.rightscale.com", nil]
    end

  end # when parsing user_data

  context "when addressing" do

    it "should form list of broker addresses from specified hosts and ports" do
      RightAMQP::HABrokerClient.addresses("first,second", "5672, 5674").should ==
        [{:host => "first", :port => 5672, :index => 0}, {:host => "second", :port => 5674, :index => 1}]
    end

    it "should form list of broker addresses from specified hosts and ports and use ids associated with hosts" do
      RightAMQP::HABrokerClient.addresses("first:1,second:2", "5672, 5674").should ==
        [{:host => "first", :port => 5672, :index => 1}, {:host => "second", :port => 5674, :index => 2}]
    end

    it "should form list of broker addresses from specified hosts and ports and use ids associated with ports" do
      RightAMQP::HABrokerClient.addresses("host", "5672:0, 5674:2").should ==
        [{:host => "host", :port => 5672, :index => 0}, {:host => "host", :port => 5674, :index => 2}]
    end

    it "should use default host and port for broker identity if none provided" do
      RightAMQP::HABrokerClient.addresses(nil, nil).should == [{:host => "localhost", :port => 5672, :index => 0}]
    end

    it "should use default port when ports is an empty string" do
      RightAMQP::HABrokerClient.addresses("first, second", "").should ==
        [{:host => "first", :port => 5672, :index => 0}, {:host => "second", :port => 5672, :index => 1}]
    end

    it "should use default host when hosts is an empty string" do
      RightAMQP::HABrokerClient.addresses("", "5672, 5673").should ==
        [{:host => "localhost", :port => 5672, :index => 0}, {:host => "localhost", :port => 5673, :index => 1}]
    end

    it "should reuse host if there is only one but multiple ports" do
      RightAMQP::HABrokerClient.addresses("first", "5672, 5674").should ==
        [{:host => "first", :port => 5672, :index => 0}, {:host => "first", :port => 5674, :index => 1}]
    end

    it "should reuse port if there is only one but multiple hosts" do
      RightAMQP::HABrokerClient.addresses("first, second", 5672).should ==
        [{:host => "first", :port => 5672, :index => 0}, {:host => "second", :port => 5672, :index => 1}]
    end

    it "should apply ids associated with host" do
      RightAMQP::HABrokerClient.addresses("first:0, third:2", 5672).should ==
        [{:host => "first", :port => 5672, :index => 0}, {:host => "third", :port => 5672, :index => 2}]
    end

    it "should not allow mismatched number of hosts and ports" do
      runner = lambda { RightAMQP::HABrokerClient.addresses("first, second", "5672, 5673, 5674") }
      runner.should raise_exception(ArgumentError)
    end

  end # when addressing

  context "when identifying" do

    before(:each) do
      @address1 = {:host => "first", :port => 5672, :index => 0}
      @identity1 = "rs-broker-first-5672"
      @broker1 = flexmock("broker_client1", :identity => @identity1, :usable? => true,
                          :alias => "b0", :host => "first", :port => 5672, :index => 0)
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity1, @address1, @serializer,
              @exceptions, @non_deliveries, Hash, nil).and_return(@broker1).by_default

      @address2 = {:host => "second", :port => 5672, :index => 1}
      @identity2 = "rs-broker-second-5672"
      @broker2 = flexmock("broker_client2", :identity => @identity2, :usable? => true,
                          :alias => "b1", :host => "second", :port => 5672, :index => 1)
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity2, @address2, @serializer,
              @exceptions, @non_deliveries, Hash, nil).and_return(@broker2).by_default

      @address3 = {:host => "third", :port => 5672, :index => 2}
      @identity3 = "rs-broker-third-5672"
      @broker3 = flexmock("broker_client3", :identity => @identity3, :usable? => true,
                          :alias => "b2", :host => "third", :port => 5672, :index => 2)
      flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity3, @address3, @serializer,
              @exceptions, @non_deliveries, Hash, nil).and_return(@broker3).by_default
    end

    it "should use host and port to uniquely identity broker in AgentIdentity format" do
      RightAMQP::HABrokerClient.identity("localhost", 5672).should == "rs-broker-localhost-5672"
      RightAMQP::HABrokerClient.identity("10.21.102.23", 1234).should == "rs-broker-10.21.102.23-1234"
    end

    it "should replace '-' with '~' in host names when forming broker identity" do
      RightAMQP::HABrokerClient.identity("9-1-1", 5672).should == "rs-broker-9~1~1-5672"
    end

    it "should use default port when forming broker identity" do
      RightAMQP::HABrokerClient.identity("10.21.102.23").should == "rs-broker-10.21.102.23-5672"
    end

    it "should list broker identities" do
      RightAMQP::HABrokerClient.identities("first,second", "5672, 5674").should ==
        ["rs-broker-first-5672", "rs-broker-second-5674"]
    end

    it "should convert identities into aliases" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.aliases([@identity3]).should == ["b2"]
      ha.aliases([@identity3, @identity1]).should == ["b2", "b0"]
    end

    it "should convert identities into nil alias when unknown" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.aliases(["rs-broker-second-5672", nil]).should == [nil, nil]
    end

    it "should convert identity into alias" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.alias_(@identity3).should == "b2"
    end

    it "should convert identity into nil alias when unknown" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.alias_("rs-broker-second-5672").should == nil
    end

    it "should convert identity into parts" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.identity_parts(@identity3).should == ["third", 5672, 2, 1]
    end

    it "should convert an alias into parts" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.identity_parts("b2").should == ["third", 5672, 2, 1]
    end

    it "should convert unknown identity into nil parts" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.identity_parts("rs-broker-second-5672").should == [nil, nil, nil, nil]
    end

    it "should get identity from identity" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.get(@identity1).should == @identity1
      ha.get("rs-broker-second-5672").should be_nil
      ha.get(@identity3).should == @identity3
    end

    it "should get identity from an alias" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first:0, third:2", :port => 5672)
      ha.get("b0").should == @identity1
      ha.get("b1").should be_nil
      ha.get("b2").should == @identity3
    end

    it "should generate host:index list" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "second:1, first:0, third:2", :port => 5672)
      ha.hosts.should == "second:1,first:0,third:2"
    end

    it "should generate port:index list" do
      ha = RightAMQP::HABrokerClient.new(@serializer, :host => "second:1, third:2, first:0", :port => 5672)
      ha.ports.should == "5672:1,5672:2,5672:0"
    end

  end # when identifying

  context "when" do

    before(:each) do
      # Generate mocking for three BrokerClients
      # key index host     alias
      { 1 => [0, "first",  "b0"],
        2 => [1, "second", "b1"],
        3 => [2, "third",  "b2"] }.each do |k, v|
              i, h,         a = v
        eval("@identity#{k} = 'rs-broker-#{h}-5672'")
        eval("@address#{k} = {:host => '#{h}', :port => 5672, :index => #{i}}")
        eval("@broker#{k} = flexmock('broker_client#{k}', :identity => @identity#{k}, :alias => '#{a}', " +
                                     ":host => '#{h}', :port => 5672, :index => #{i})")
        eval("@broker#{k}.should_receive(:status).and_return(:connected).by_default")
        eval("@broker#{k}.should_receive(:usable?).and_return(true).by_default")
        eval("@broker#{k}.should_receive(:connected?).and_return(true).by_default")
        eval("@broker#{k}.should_receive(:subscribe).and_return(true).by_default")
        eval("flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity#{k}, @address#{k}, " +
                       "@serializer, @exceptions, @non_deliveries, Hash, nil).and_return(@broker#{k}).by_default")
      end
    end
  
    context "connecting" do

      it "should connect and add a new broker client to the end of the list" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first", :port => 5672)
        ha.brokers.size.should == 1
        ha.brokers[0].alias == "b0"
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity2, @address2, @serializer,
                 @exceptions, @non_deliveries, Hash, nil).and_return(@broker2).once
        res = ha.connect("second", 5672, 1)
        res.should be_true
        ha.brokers.size.should == 2
        ha.brokers[1].alias == "b1"
      end

      it "should reconnect an existing broker client after closing it if it is not connected" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        ha.brokers.size.should == 2
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:close).and_return(true).once
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity1, @address1, @serializer,
                 @exceptions, @non_deliveries, Hash, ha.brokers[0]).and_return(@broker1).once
        res = ha.connect("first", 5672, 0)
        res.should be_true
        ha.brokers.size.should == 2
        ha.brokers[0].alias == "b0"
        ha.brokers[1].alias == "b1"
      end

      it "should not do anything except log a message if asked to reconnect an already connected broker client" do
        @logger.should_receive(:info).with(/Ignored request to reconnect/).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        ha.brokers.size.should == 2
        @broker1.should_receive(:status).and_return(:connected).once
        @broker1.should_receive(:close).and_return(true).never
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity1, @address1, @serializer,
                 @exceptions, @non_deliveries, Hash, ha.brokers[0]).and_return(@broker1).never
        res = ha.connect("first", 5672, 0)
        res.should be_false
        ha.brokers.size.should == 2
        ha.brokers[0].alias == "b0"
        ha.brokers[1].alias == "b1"
      end

      it "should reconnect already connected broker client if force specified" do
        @logger.should_receive(:info).with(/Ignored request to reconnect/).never
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        ha.brokers.size.should == 2
        @broker1.should_receive(:close).and_return(true).once
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity1, @address1, @serializer,
                 @exceptions, @non_deliveries, Hash, ha.brokers[0]).and_return(@broker1).once
        res = ha.connect("first", 5672, 0, nil, force = true)
        res.should be_true
        ha.brokers.size.should == 2
        ha.brokers[0].alias == "b0"
        ha.brokers[1].alias == "b1"
      end

      it "should be able to change host and port of an existing broker client" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        ha.brokers.size.should == 2
        @broker1.should_receive(:close).and_return(true).once
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity3, @address3.merge(:index => 0),
                 @serializer, @exceptions, @non_deliveries, Hash, nil).and_return(@broker3).once
        res = ha.connect("third", 5672, 0)
        res.should be_true
        ha.brokers.size.should == 2
        ha.brokers[0].alias == "b0"
        ha.brokers[0].identity == @address3
        ha.brokers[1].alias == "b1"
        ha.brokers[1].identity == @address_b
      end

      it "should slot broker client into specified priority position when at end of list" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        ha.brokers.size.should == 2
        res = ha.connect("third", 5672, 2, 2)
        res.should be_true
        ha.brokers.size.should == 3
        ha.brokers[0].alias == "b0"
        ha.brokers[1].alias == "b1"
        ha.brokers[2].alias == "b2"
      end

      it "should slot broker client into specified priority position when already is a client in that position" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        ha.brokers.size.should == 2
        res = ha.connect("third", 5672, 2, 1)
        res.should be_true
        ha.brokers.size.should == 3
        ha.brokers[0].alias == "b0"
        ha.brokers[1].alias == "b2"
        ha.brokers[2].alias == "b1"
      end

      it "should slot broker client into nex priority position if specified priority would leave a gap" do
        @logger.should_receive(:info).with(/Reduced priority setting for broker/).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first")
        ha.brokers.size.should == 1
        res = ha.connect("third", 5672, 2, 2)
        res.should be_true
        ha.brokers.size.should == 2
        ha.brokers[0].alias == "b0"
        ha.brokers[1].alias == "b2"
      end

      it "should yield to the block provided with the newly connected broker identity" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first")
        ha.brokers.size.should == 1
        ha.brokers[0].alias == "b0"
        identity = nil
        res = ha.connect("second", 5672, 1) { |i| identity = i }
        res.should be_true
        identity.should == @identity2
        ha.brokers.size.should == 2
        ha.brokers[1].alias == "b1"
      end

    end # connecting

    context "subscribing" do

      it "should subscribe on all usable broker clients and return their identities" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:subscribe).never
        @broker2.should_receive(:subscribe).and_return(true).once
        @broker3.should_receive(:subscribe).and_return(true).once
        result = ha.subscribe({:name => "queue"}, {:type => :direct, :name => "exchange"})
        result.should == [@identity2, @identity3]
      end

      it "should not return the identity if subscribe fails" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:subscribe).never
        @broker2.should_receive(:subscribe).and_return(true).once
        @broker3.should_receive(:subscribe).and_return(false).once
        result = ha.subscribe({:name => "queue"}, {:type => :direct, :name => "exchange"})
        result.should == [@identity2]
      end

      it "should subscribe only on specified brokers" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:subscribe).never
        @broker2.should_receive(:subscribe).and_return(true).once
        @broker3.should_receive(:subscribe).never
        result = ha.subscribe({:name => "queue"}, {:type => :direct, :name => "exchange"},
                              :brokers => [@identity1, @identity2])
        result.should == [@identity2]
      end

    end # subscribing

    context "unsubscribing" do

      before(:each) do
        @timer = flexmock("timer", :cancel => true).by_default
        flexmock(EM::Timer).should_receive(:new).and_return(@timer).by_default
        @queue_name = "my_queue"
        @queue = flexmock("queue", :name => @queue_name)
        @queues = [@queue]
        @broker1.should_receive(:queues).and_return(@queues).by_default
        @broker1.should_receive(:unsubscribe).and_return(true).and_yield.by_default
        @broker2.should_receive(:queues).and_return(@queues).by_default
        @broker2.should_receive(:unsubscribe).and_return(true).and_yield.by_default
        @broker3.should_receive(:queues).and_return(@queues).by_default
        @broker3.should_receive(:unsubscribe).and_return(true).and_yield.by_default
      end

      it "should unsubscribe from named queues on all usable broker clients" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:unsubscribe).never
        @broker2.should_receive(:unsubscribe).and_return(true).once
        @broker3.should_receive(:unsubscribe).and_return(true).once
        ha.unsubscribe([@queue_name]).should be_true
      end

      it "should yield to supplied block after unsubscribing" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.subscribe({:name => @queue_name}, {:type => :direct, :name => "exchange"})
        called = 0
        ha.unsubscribe([@queue_name]) { called += 1 }
        called.should == 1
      end

      it "should yield to supplied block if timeout before finish unsubscribing" do
        flexmock(EM::Timer).should_receive(:new).with(10, Proc).and_return(@timer).and_yield.once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.subscribe({:name => @queue_name}, {:type => :direct, :name => "exchange"})
        called = 0
        ha.unsubscribe([@queue_name], 10) { called += 1 }
        called.should == 1
      end

      it "should cancel timer if finish unsubscribing before timer fires" do
        @timer.should_receive(:cancel).once
        flexmock(EM::Timer).should_receive(:new).with(10, Proc).and_return(@timer).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.subscribe({:name => @queue_name}, {:type => :direct, :name => "exchange"})
        called = 0
        ha.unsubscribe([@queue_name], 10) { called += 1 }
        called.should == 1
      end

      it "should yield to supplied block after unsubscribing even if no queues to unsubscribe" do
        @broker1.should_receive(:queues).and_return([])
        @broker2.should_receive(:queues).and_return([])
        @broker3.should_receive(:queues).and_return([])
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        called = 0
        ha.unsubscribe([@queue_name]) { called += 1 }
        called.should == 1
      end

      it "should yield to supplied block once after unsubscribing all queues" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.subscribe({:name => @queue_name}, {:type => :direct, :name => "exchange"})
        called = 0
        ha.unsubscribe([@queue_name]) { called += 1 }
        called.should == 1
      end

    end # unsubscribing

    context "declaring" do

      it "should declare exchange on all usable broker clients and return their identities" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:declare).never
        @broker2.should_receive(:declare).and_return(true).once
        @broker3.should_receive(:declare).and_return(true).once
        result = ha.declare(:exchange, "x", :durable => true)
        result.should == [@identity2, @identity3]
      end

      it "should not return the identity if declare fails" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:declare).never
        @broker2.should_receive(:declare).and_return(true).once
        @broker3.should_receive(:declare).and_return(false).once
        result = ha.declare(:exchange, "x", :durable => true)
        result.should == [@identity2]
      end

      it "should declare exchange only on specified brokers" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:declare).never
        @broker2.should_receive(:declare).and_return(true).once
        @broker3.should_receive(:declare).never
        result = ha.declare(:exchange, "x", :durable => true, :brokers => [@identity1, @identity2])
        result.should == [@identity2]
      end

    end # declaring

    context "checking status" do

      before(:each) do
        @timeout = 10
        @timer = flexmock("timer", :cancel => true).by_default
        flexmock(EM::Timer).should_receive(:new).with(@timeout, Proc).and_return(@timer).by_default
        @queue_name = "my_queue"
        @queue = flexmock("queue", :name => @queue_name)
        @queues = [@queue]
        @broker1.should_receive(:queues).and_return(@queues).by_default
        @broker1.should_receive(:connected?).and_return(true).by_default
        @broker1.should_receive(:queue_status).and_return(true).by_default
        @broker2.should_receive(:queues).and_return(@queues).by_default
        @broker2.should_receive(:connected?).and_return(true).by_default
        @broker2.should_receive(:queue_status).and_return(true).by_default
        @ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
      end

      it "should not check status if there are no queues" do
        @ha.queue_status([]).should be_true
      end

      it "should make block call with empty status if there are no queues" do
        called = 0
        @ha.queue_status(["my_other_queue"], @timeout) { |status| status.should == {}; called += 1 }.should be_true
        called.should == 1
      end

      it "should wait to make callback until the status of all queues is obtained" do
        @broker1.should_receive(:queue_status).with([@queue_name], Proc).and_return(true).and_yield(@queue_name, 0, 1).once
        @broker2.should_receive(:queue_status).with([@queue_name], Proc).and_return(true).and_yield(@queue_name, 1, 2).once
        called = 0
        @ha.queue_status([@queue_name], @timeout) do |status|
          status.should == {@queue_name => {@broker1.identity => {:messages => 0, :consumers => 1},
                                            @broker2.identity => {:messages => 1, :consumers => 2}}}
          called += 1
        end.should be_true
        called.should == 1
      end

      it "should account for queues for which status cannot be obtained" do
        called = 0
        @broker1.should_receive(:queue_status).with([@queue_name], Proc).and_return(false).once
        @broker2.should_receive(:queue_status).with([@queue_name], Proc).and_return(true).and_yield(@queue_name, 1, 2).once
        @ha.queue_status([@queue_name], @timeout) do |status|
          status.should == {@queue_name => {@broker2.identity => {:messages => 1, :consumers => 2}}}
          called += 1
        end.should be_true
        called.should == 1
      end

    end # checking status

    context "publishing" do

      before(:each) do
        @broker1.should_receive(:publish).and_return(true).by_default
        @broker2.should_receive(:publish).and_return(true).by_default
        @broker3.should_receive(:publish).and_return(true).by_default
      end

      it "should serialize message, publish it, and return list of broker identifiers" do
        @serializer.should_receive(:dump).with(@packet).and_return(@message).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange", :options => {:durable => true}},
                   @packet, :persistent => true).should == [@identity1]
      end

      it "should try other broker clients if a publish fails" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:publish).and_return(false)
        ha.publish({:type => :direct, :name => "exchange"}, @packet).should == [@identity2]
      end

      it "should publish to a randomly selected broker if random requested" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        srand(100)
        ha.publish({:type => :direct, :name => "exchange"}, @packet, :order => :random,
                   :brokers =>[@identity1, @identity2, @identity3]).should == [@identity2]
      end

      it "should publish to all connected brokers if fanout requested" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet, :fanout => true,
                   :brokers =>[@identity1, @identity2]).should == [@identity1, @identity2]
      end

      it "should publish only using specified brokers" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet,
                   :brokers =>[@identity1, @identity2]).should == [@identity1]
      end

      it "should log an error if a selected broker is unknown but still publish with any remaining brokers" do
        @logger.should_receive(:error).with(/Invalid broker identity "rs-broker-fourth-5672"/).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet,
                   :brokers =>["rs-broker-fourth-5672", @identity1]).should == [@identity1]
      end

      it "should raise an exception if all available brokers fail to publish" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:publish).and_return(false)
        @broker2.should_receive(:publish).and_return(false)
        @broker3.should_receive(:publish).and_return(false)
        lambda { ha.publish({:type => :direct, :name => "exchange"}, @packet) }.
                should raise_error(RightAMQP::HABrokerClient::NoConnectedBrokers)
      end

      it "should not serialize the message if it is already serialized" do
        @serializer.should_receive(:dump).with(@packet).and_return(@message).never
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet, :no_serialize => true).should == [@identity1]
      end

      it "should store message info for use by message returns if :mandatory specified" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet, :mandatory => true).should == [@identity1]
        ha.instance_variable_get(:@published).instance_variable_get(:@cache).size.should == 1
      end

      it "should not store message info for use by message returns if message already serialized" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet, :no_serialize => true).should == [@identity1]
        ha.instance_variable_get(:@published).instance_variable_get(:@cache).size.should == 0
      end

      it "should not store message info for use by message returns if mandatory not specified" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.publish({:type => :direct, :name => "exchange"}, @packet).should == [@identity1]
        ha.instance_variable_get(:@published).instance_variable_get(:@cache).size.should == 0
      end

    end # publishing

    context "returning" do

      before(:each) do
        @broker1.should_receive(:publish).and_return(true).by_default
        @broker2.should_receive(:publish).and_return(true).by_default
        @broker3.should_receive(:publish).and_return(true).by_default
      end

      context "when non-delivery" do

        it "should store non-delivery block for use by return handler" do
          ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
          non_delivery = lambda {}
          ha.non_delivery(&non_delivery)
          ha.instance_variable_get(:@non_delivery).should == non_delivery
        end

      end

      context "when handling return" do

        before(:each) do
          @options = {}
          @brokers = [@identity1, @identity2]
          @ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
          @context = RightAMQP::HABrokerClient::Context.new(@packet, @options, @brokers)
          flexmock(@ha.instance_variable_get(:@published)).should_receive(:fetch).with(@message).and_return(@context).by_default
        end

        it "should republish using a broker not yet tried if possible and log that re-routing" do
          @logger.should_receive(:info).with(/RE-ROUTE/).once
          @logger.should_receive(:info).with(/RETURN reason/).once
          @broker2.should_receive(:publish).and_return(true).once
          @context.record_failure(@identity1)
          @ha.__send__(:handle_return, @identity1, "to", "reason", @message)
        end

        it "should republish to same broker without mandatory if message is persistent and no other brokers available" do
          @logger.should_receive(:info).with(/RE-ROUTE/).once
          @logger.should_receive(:info).with(/RETURN reason/).once
          @context.record_failure(@identity1)
          @context.record_failure(@identity2)
          @packet.should_receive(:persistent).and_return(true)
          @broker1.should_receive(:publish).and_return(true).once
          @ha.__send__(:handle_return, @identity2, "to", "NO_CONSUMERS", @message)
        end

        it "should republish to same broker without mandatory if message is one-way and no other brokers available" do
          @logger.should_receive(:info).with(/RE-ROUTE/).once
          @logger.should_receive(:info).with(/RETURN reason/).once
          @context.record_failure(@identity1)
          @context.record_failure(@identity2)
          @packet.should_receive(:one_way).and_return(true)
          @broker1.should_receive(:publish).and_return(true).once
          @ha.__send__(:handle_return, @identity2, "to", "NO_CONSUMERS", @message)
        end

        it "should update status to :stopping if message returned because access refused" do
          @logger.should_receive(:info).with(/RE-ROUTE/).once
          @logger.should_receive(:info).with(/RETURN reason/).once
          @context.record_failure(@identity1)
          @broker2.should_receive(:publish).and_return(true).once
          @broker1.should_receive(:update_status).with(:stopping).and_return(true).once
          @ha.__send__(:handle_return, @identity1, "to", "ACCESS_REFUSED", @message)
        end

        it "should log info and make non-delivery call even if persistent when returned because of no queue" do
          @logger.should_receive(:info).with(/NO ROUTE/).once
          @logger.should_receive(:info).with(/RETURN reason/).once
          called = 0
          @ha.non_delivery { |reason, type, token, from, to| called += 1 }
          @context.record_failure(@identity1)
          @context.record_failure(@identity2)
          @packet.should_receive(:persistent).and_return(true)
          @broker1.should_receive(:publish).and_return(true).never
          @broker2.should_receive(:publish).and_return(true).never
          @ha.__send__(:handle_return, @identity2, "to", "NO_QUEUE", @message)
          called.should == 1
        end

        it "should log info and make non-delivery call if no route can be found" do
          @logger.should_receive(:info).with(/NO ROUTE/).once
          @logger.should_receive(:info).with(/RETURN reason/).once
          called = 0
          @ha.non_delivery { |reason, type, token, from, to| called += 1 }
          @context.record_failure(@identity1)
          @context.record_failure(@identity2)
          @broker1.should_receive(:publish).and_return(true).never
          @broker2.should_receive(:publish).and_return(true).never
          @ha.__send__(:handle_return, @identity2, "to", "any reason", @message)
          called.should == 1
        end

        it "should log info if no message context available for re-routing it" do
          @logger.should_receive(:info).with(/Dropping/).once
          flexmock(@ha.instance_variable_get(:@published)).should_receive(:fetch).with(@message).and_return(nil).once
          @ha.__send__(:handle_return, @identity2, "to", "any reason", @message)
        end

      end

    end # returning

    context "deleting" do

      it "should delete queue on all usable broker clients and return their identities" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:delete).never
        @broker2.should_receive(:delete).and_return(true).once
        @broker3.should_receive(:delete).and_return(true).once
        ha.delete("queue").should == [@identity2, @identity3]
      end

      it "should not return the identity if delete fails" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:delete).never
        @broker2.should_receive(:delete).and_return(true).once
        @broker3.should_receive(:delete).and_return(false).once
        ha.delete("queue").should == [@identity2]
      end

      it "should delete queue from cache on all usable broker clients and return their identities" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(false)
        @broker1.should_receive(:delete_amqp_resources).never
        @broker2.should_receive(:delete_amqp_resources).and_return(true).once
        @broker3.should_receive(:delete_amqp_resources).and_return(true).once
        ha.delete_amqp_resources("queue").should == [@identity2, @identity3]
      end

    end # deleting

    context "removing" do

      it "should remove broker client after disconnecting and pass identity to block" do
        @logger.should_receive(:info).with(/Removing/).once
        @broker2.should_receive(:close).with(true, true, false).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        identity = nil
        result = ha.remove("second", 5672) { |i| identity = i }
        result.should == @identity2
        identity.should == @identity2
        ha.get(@identity1).should_not be_nil
        ha.get(@identity2).should be_nil
        ha.get(@identity3).should_not be_nil
        ha.brokers.size.should == 2
      end

      it "should remove broker when no block supplied but still return a result" do
        @logger.should_receive(:info).with(/Removing/).once
        @broker2.should_receive(:close).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        result = ha.remove("second", 5672)
        result.should == @identity2
        ha.get(@identity1).should_not be_nil
        ha.get(@identity2).should be_nil
        ha.get(@identity3).should_not be_nil
        ha.brokers.size.should == 2
      end

      it "should remove last broker if requested" do
        @logger.should_receive(:info).with(/Removing/).times(3)
        @broker1.should_receive(:close).once
        @broker2.should_receive(:close).once
        @broker3.should_receive(:close).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        result = ha.remove("second", 5672)
        result.should == @identity2
        ha.get(@identity2).should be_nil
        result = ha.remove("third", 5672)
        result.should == @identity3
        ha.get(@identity3).should be_nil
        ha.brokers.size.should == 1
        identity = nil
        result = ha.remove("first", 5672) { |i| identity = i }
        result.should == @identity1
        identity.should == @identity1
        ha.get(@identity1).should be_nil
        ha.brokers.size.should == 0
      end

      it "should return nil and not execute block if broker is unknown" do
        @logger.should_receive(:info).with(/Ignored request to remove/).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.remove("fourth", 5672).should be_nil
        ha.brokers.size.should == 3
      end

      it "should close connection and mark as failed when told broker is not usable" do
        @broker2.should_receive(:close).with(true, false, false).once
        @broker3.should_receive(:close).with(true, false, false).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        result = ha.declare_unusable([@identity2, @identity3])
        ha.brokers.size.should == 3
      end

      it "should raise an exception if broker that is declared not usable is unknown" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        lambda { ha.declare_unusable(["rs-broker-fourth-5672"]) }.should raise_error(Exception, /Cannot mark unknown/)
        ha.brokers.size.should == 3
      end

    end # removing

    context "monitoring" do

      before(:each) do
        @timer = flexmock("timer")
        flexmock(EM::Timer).should_receive(:new).and_return(@timer).by_default
        @timer.should_receive(:cancel).by_default
        @identity = "rs-broker-localhost-5672"
        @address = {:host => "localhost", :port => 5672, :index => 0}
        @broker = flexmock("broker_client", :identity => @identity, :alias => "b0", :host => "localhost",
                            :port => 5672, :index => 0)
        @broker.should_receive(:status).and_return(:connected).by_default
        @broker.should_receive(:usable?).and_return(true).by_default
        @broker.should_receive(:connected?).and_return(true).by_default
        @broker.should_receive(:subscribe).and_return(true).by_default
        flexmock(RightAMQP::BrokerClient).should_receive(:new).and_return(@broker).by_default
        @broker1.should_receive(:failed?).and_return(false).by_default
        @broker2.should_receive(:failed?).and_return(false).by_default
        @broker3.should_receive(:failed?).and_return(false).by_default
      end

      [:usable, :connected].each do |status|
        status_query = "#{status}?".to_sym

        it "should give access to or list #{status} brokers" do
          ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
          aliases = []
          res = ha.__send__(:each, status) { |b| aliases << b.alias }
          aliases.should == ["b0", "b1", "b2"]
          res.size.should == 3
          res[0].alias.should == "b0"
          res[1].alias.should == "b1"
          res[2].alias.should == "b2"

          @broker1.should_receive(status_query).and_return(true)
          @broker2.should_receive(status_query).and_return(false)
          @broker3.should_receive(status_query).and_return(false)
          aliases = []
          res = ha.__send__(:each, status) { |b| aliases << b.alias }
          aliases.should == ["b0"]
          res.size.should == 1
          res[0].alias.should == "b0"
        end

        it "should give access to each selected #{status} broker" do
          ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
          @broker2.should_receive(status_query).and_return(true)
          @broker3.should_receive(status_query).and_return(false)
          aliases = []
          res = ha.__send__(:each, status, [@identity2, @identity3]) { |b| aliases << b.alias }
          aliases.should == ["b1"]
          res.size.should == 1
          res[0].alias.should == "b1"
        end
      end

      it "should give list of unusable brokers" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:usable?).and_return(true)
        @broker2.should_receive(:usable?).and_return(false)
        @broker3.should_receive(:usable?).and_return(false)
        ha.unusable.should == [@identity2, @identity3]
      end

      it "should tell whether a broker is connected" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker2.should_receive(:connected?).and_return(false)
        @broker3.should_receive(:connected?).and_return(true)
        ha.connected?(@identity2).should be_false
        ha.connected?(@identity3).should be_true
        ha.connected?("rs-broker-fourth-5672").should be_nil
      end

      it "should give list of all brokers" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.all.should == [@identity1, @identity2, @identity3]
      end

      it "should give list of failed brokers" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:failed?).and_return(true)
        @broker2.should_receive(:failed?).and_return(false)
        @broker3.should_receive(:failed?).and_return(true)
        ha.failed.should == [@identity1, @identity3]
      end

      it "should give broker client status list" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:summary).and_return("summary1")
        @broker2.should_receive(:summary).and_return("summary2")
        @broker3.should_receive(:summary).and_return("summary3")
        ha.status.should == ["summary1", "summary2", "summary3"]
      end

      it "should give broker client statistics" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:stats).and_return("stats1")
        @broker2.should_receive(:stats).and_return("stats2")
        @broker3.should_receive(:stats).and_return("stats3")
        ha.stats.should == {"brokers" => ["stats1", "stats2", "stats3"],
                            "exceptions" => nil,
                            "heartbeat" => nil,
                            "non-deliveries" => nil,
                            "returns" => nil}
      end

      it "should log broker client status update if there is a change" do
        @logger.should_receive(:info).with(/Broker b0 is now connected/).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.__send__(:update_status, @broker1, false)
      end

      it "should not log broker client status update if there is no change" do
        @logger.should_receive(:info).with(/Broker b0 is now connected/).never
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.__send__(:update_status, @broker1, true)
      end

      it "should log broker client status update when become disconnected" do
        @logger.should_receive(:info).with(/Broker b0 is now disconnected/).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        @broker1.should_receive(:status).and_return(:disconnected)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, true)
      end

      it "should provide connection status callback when cross 0/1 connection boundary" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        connected = 0
        disconnected = 0
        ha.connection_status do |status|
          if status == :connected
            (ha.brokers[0].status == :connected ||
             ha.brokers[1].status == :connected).should be_true
            connected += 1
          elsif status == :disconnected
            (ha.brokers[0].status == :disconnected &&
             ha.brokers[1].status == :disconnected).should be_true
            disconnected += 1
          end
        end
        ha.__send__(:update_status, @broker1, false)
        connected.should == 0
        disconnected.should == 0
        @broker1.should_receive(:status).and_return(:disconnected)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, true)
        connected.should == 0
        disconnected.should == 0
        @broker2.should_receive(:status).and_return(:disconnected)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, true)
        connected.should == 0
        disconnected.should == 1
        # TODO fix this test so that also checks crossing boundary as become connected
      end

      it "should provide connection status callback when cross n/n-1 connection boundary when all specified" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        connected = 0
        disconnected = 0
        ha.connection_status(:boundary => :all) do |status|
          if status == :connected
            (ha.brokers[0].status == :connected &&
             ha.brokers[1].status == :connected).should be_true
            connected += 1
          elsif status == :disconnected
            (ha.brokers[0].status == :disconnected ||
             ha.brokers[1].status == :disconnected).should be_true
            disconnected += 1
          end
        end
        ha.__send__(:update_status, @broker1, false)
        connected.should == 1
        disconnected.should == 0
        @broker1.should_receive(:status).and_return(:disconnected)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, true)
        connected.should == 1
        disconnected.should == 1
        @broker2.should_receive(:status).and_return(:disconnected)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, true)
        connected.should == 1
        disconnected.should == 1
        # TODO fix this test so that also checks crossing boundary as become disconnected
      end

      it "should provide connection status callback for specific broker set" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        connected = 0
        disconnected = 0
        ha.connection_status(:brokers => [@identity2, @identity3]) do |status|
          if status == :connected
            (ha.brokers[1].status == :connected ||
             ha.brokers[2].status == :connected).should be_true
            connected += 1
          elsif status == :disconnected
            (ha.brokers[1].status == :disconnected &&
             ha.brokers[2].status == :disconnected).should be_true
            disconnected += 1
          end
        end
        ha.__send__(:update_status, @broker1, false)
        connected.should == 0
        disconnected.should == 0
        @broker1.should_receive(:status).and_return(:disconnected)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, true)
        connected.should == 0
        disconnected.should == 0
        @broker2.should_receive(:status).and_return(:disconnected)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, true)
        connected.should == 0
        disconnected.should == 0
        @broker3.should_receive(:status).and_return(:disconnected)
        @broker3.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker3, true)
        connected.should == 0
        disconnected.should == 1
      end

      it "should provide connection status callback only once when one-off is requested" do
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity, @address, @serializer,
                @exceptions, @non_deliveries, Hash, nil).and_return(@broker).once
        ha = RightAMQP::HABrokerClient.new(@serializer)
        called = 0
        ha.connection_status(:one_off => 10) { |_| called += 1 }
        ha.__send__(:update_status, @broker, false)
        called.should == 1
        @broker.should_receive(:status).and_return(:disconnected)
        @broker.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker, true)
        called.should == 1
      end

      it "should use connection status timer when one-off is requested" do
        flexmock(EM::Timer).should_receive(:new).and_return(@timer).once
        @timer.should_receive(:cancel).once
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity, @address, @serializer,
                @exceptions, @non_deliveries, Hash, nil).and_return(@broker).once
        ha = RightAMQP::HABrokerClient.new(@serializer)
        called = 0
        ha.connection_status(:one_off => 10) { |_| called += 1 }
        ha.__send__(:update_status, @broker, false)
        called.should == 1
      end

      it "should give timeout connection status if one-off request times out" do
        flexmock(EM::Timer).should_receive(:new).and_return(@timer).and_yield.once
        @timer.should_receive(:cancel).never
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity, @address, @serializer,
                @exceptions, @non_deliveries, Hash, nil).and_return(@broker).once
        ha = RightAMQP::HABrokerClient.new(@serializer)
        called = 0
        ha.connection_status(:one_off => 10) { |status| called += 1; status.should == :timeout }
        called.should == 1
      end

      it "should be able to have multiple connection status callbacks" do
        flexmock(RightAMQP::BrokerClient).should_receive(:new).with(@identity, @address, @serializer,
                @exceptions, @non_deliveries, Hash, nil).and_return(@broker).once
        ha = RightAMQP::HABrokerClient.new(@serializer)
        called1 = 0
        called2 = 0
        ha.connection_status(:one_off => 10) { |_| called1 += 1 }
        ha.connection_status(:boundary => :all) { |_| called2 += 1 }
        ha.__send__(:update_status, @broker, false)
        @broker.should_receive(:status).and_return(:disconnected)
        @broker.should_receive(:connected?).and_return(false)
        @broker.should_receive(:failed?).and_return(false)
        ha.__send__(:update_status, @broker, true)
        called1.should == 1
        called2.should == 2
      end

      it "should provide failed connection status callback when all broker connections fail with :any option" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        connected = disconnected = failed = 0
        ha.connection_status(:boundary => :any) do |status|
          if status == :connected
            connected += 1
          elsif status == :disconnected
            disconnected += 1
          elsif status == :failed
            (ha.brokers[0].failed? &&
             ha.brokers[1].failed?).should be_true
            failed += 1
          end
        end
        @broker2.should_receive(:failed?).and_return(true)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, true)
        connected.should == 0
        disconnected.should == 0
        failed.should == 0
        @broker1.should_receive(:failed?).and_return(true)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, true)
        connected.should == 0
        disconnected.should == 0
        failed.should == 1
      end

      it "should provide failed connection status callback when all broker connections fail with :all option" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second")
        connected = disconnected = failed = 0
        ha.connection_status(:boundary => :all) do |status|
          if status == :connected
            connected += 1
          elsif status == :disconnected
            disconnected += 1
          elsif status == :failed
            (ha.brokers[0].failed? &&
             ha.brokers[1].failed?).should be_true
            failed += 1
          end
        end
        @broker2.should_receive(:failed?).and_return(true)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, true)
        connected.should == 0
        disconnected.should == 1
        failed.should == 0
        @broker1.should_receive(:failed?).and_return(true)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, true)
        connected.should == 0
        disconnected.should == 1
        failed.should == 1
      end

      it "should provide failed connection status callback when all brokers fail to connect" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        connected = disconnected = failed = 0
        ha.connection_status(:boundary => :all) do |status|
          if status == :connected
            connected += 1
          elsif status == :disconnected
            disconnected += 1
          elsif status == :failed
            (ha.brokers[0].failed? &&
             ha.brokers[1].failed? &&
             ha.brokers[2].failed?).should be_true
            failed += 1
          end
        end
        @broker1.should_receive(:failed?).and_return(true)
        @broker1.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker1, false)
        connected.should == 0
        disconnected.should == 0
        failed.should == 0
        @broker2.should_receive(:failed?).and_return(true)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, false)
        connected.should == 0
        disconnected.should == 0
        failed.should == 0
        @broker3.should_receive(:failed?).and_return(true)
        @broker3.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker3, false)
        connected.should == 0
        disconnected.should == 0
        failed.should == 1
      end

      it "should provide failed connection status callback when brokers selected and all brokers fail to connect" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        connected = disconnected = failed = 0
        ha.connection_status(:boundary => :all, :brokers => [@broker2.identity, @broker3.identity]) do |status|
          if status == :connected
            connected += 1
          elsif status == :disconnected
            disconnected += 1
          elsif status == :failed
            (ha.brokers[0].failed? &&
             ha.brokers[1].failed?).should be_true
            failed += 1
          end
        end
        @broker1.should_receive(:failed?).and_return(true)
        @broker2.should_receive(:failed?).and_return(true)
        @broker2.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker2, false)
        connected.should == 0
        disconnected.should == 0
        failed.should == 0
        @broker3.should_receive(:failed?).and_return(true)
        @broker3.should_receive(:connected?).and_return(false)
        ha.__send__(:update_status, @broker3, false)
        connected.should == 0
        disconnected.should == 0
        failed.should == 1
      end

    end # monitoring

    context "closing" do

      it "should close all broker connections and execute block after all connections are closed" do
        @broker1.should_receive(:close).with(false, Proc).and_return(true).and_yield.once
        @broker2.should_receive(:close).with(false, Proc).and_return(true).and_yield.once
        @broker3.should_receive(:close).with(false, Proc).and_return(true).and_yield.once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        called = 0
        ha.close { called += 1 }
        called.should == 1
      end

      it "should close broker connections when no block supplied" do
        @broker1.should_receive(:close).with(false, Proc).and_return(true).and_yield.once
        @broker2.should_receive(:close).with(false, Proc).and_return(true).and_yield.once
        @broker3.should_receive(:close).with(false, Proc).and_return(true).and_yield.once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.close
      end

      it "should close all broker connections even if encounter an exception" do
        @logger.should_receive(:error).with(/Failed to close/).once
        @broker1.should_receive(:close).and_return(true).and_yield.once
        @broker2.should_receive(:close).and_raise(Exception).once
        @broker3.should_receive(:close).and_return(true).and_yield.once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        called = 0
        ha.close { called += 1 }
        called.should == 1
      end

      it "should close an individual broker connection" do
        @broker1.should_receive(:close).with(true).and_return(true).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.close_one(@identity1)
      end

      it "should not propagate connection status change if requested not to" do
        @broker1.should_receive(:close).with(false).and_return(true).once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        ha.close_one(@identity1, propagate = false)
      end

      it "should close an individual broker connection and execute block if given" do
        @broker1.should_receive(:close).with(true, Proc).and_return(true).and_yield.once
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        called = 0
        ha.close_one(@identity1) { called += 1 }
        called.should == 1
      end

      it "should raise exception if unknown broker" do
        ha = RightAMQP::HABrokerClient.new(@serializer, :host => "first, second, third")
        lambda { ha.close_one("rs-broker-fourth-5672") }.should raise_error(Exception, /Cannot close unknown broker/)
      end

    end # closing

  end # when

end # RightAMQP::HABrokerClient