# -*- coding: utf-8 -*- require_relative '../../spec_helper' require_relative '../../../lib/tengine/mq/suite' # ログを黙らせたり喋らせたりする require 'amq/client' if $DEBUG require 'logger' AMQP::Session.logger = Tengine.logger = Logger.new(STDERR) else AMQP::Session.logger = Tengine.logger = Tengine::NullLogger.new end require 'amq/client/callbacks' describe Tengine::Mq::Suite do shared_examples "Tengine::Mq::Suite" do describe "#initialize" do context "no args" do subject { Tengine::Mq::Suite.new } its(:config) { should == { :sender => { :keep_connection => false, :retry_interval => 1, :retry_count => 30, }, :connection => { :user => 'guest', :pass => 'guest', :vhost => '/', :logging => false, :insist => false, :host => 'localhost', :port => 5672, :auto_reconnect_delay => 1, }, :channel => { :prefetch => 1, :auto_recovery => true, }, :exchange => { :name => 'tengine_event_exchange', :type => :direct, :passive => false, :durable => true, :auto_delete => false, :internal => false, :nowait => false, :publish => { :content_type => "application/json", :persistent => true, }, }, :queue => { :name => 'tengine_event_queue', :passive => false, :durable => true, :auto_delete => false, :exclusive => false, :nowait => false, :subscribe => { :ack => true, :nowait => false, :confirm => nil, }, }, } } end context "hash arg" do subject { Tengine::Mq::Suite.new :sender => { :keep_connection => false } } its(:config) { should have_key(:sender) } it "merges the argument" do subject.config[:sender][:keep_connection].should be_false subject.config[:sender].should have_key(:retry_interval) end end end describe "#config" do subject { Tengine::Mq::Suite.new.config } it { should be_kind_of(Hash) } it { should be_frozen } end describe "#add_hook" do subject { Tengine::Mq::Suite.new the_config } context "no arg" do it { expect { subject.add_hook }.to raise_error(ArgumentError) } end context "many arg" do it { expect { subject.add_hook :foo, :bar }.to raise_error(ArgumentError) } end context "no block" do it { expect { subject.add_hook :foo }.to raise_error(ArgumentError) } end context "one arg, one block" do it { expect { subject.add_hook(:foo){ } }.to_not raise_error(ArgumentError) } end context "connection.on_closed" do it "called" do block_called = false subject.add_hook "connection.on_closed" do block_called = true end EM.run do subject.subscribe {|x, y| } EM.add_timer(0.1) { subject.stop } end block_called.should be_true end end context "connection.on_tcp_connection_failure" do subject { port = the_config[:connection] ? the_config[:connection][:port] : 5672 Tengine::Mq::Suite.new :connection => { :port => port + rand(1024) } } it "https://www.pivotaltracker.com/story/show/18317933" do block_called = false subject.add_hook "connection.on_tcp_connection_failure" do block_called = true end expect { EM.run_block do subject.subscribe {|x, y| } EM.add_timer(0.2) { subject.stop } end }.to raise_exception block_called.should be_true end end # context "channel.on_error" ... # context "connection.after_recovery" ... it "hookを保持する" do mq = nil # 1st time yielded = 0 EM.run do mq = subject mq.send :ensures, :connection do mq.add_hook(:"connection.on_closed") do yielded += 1 end mq.stop end end # 2nd time yielded = 0 EM.run do mq.send :ensures, :connection do mq.stop end end yielded.should == 1 end it "hookを保持する #2" do mq = nil # 1st time yielded = 0 EM.run do mq = subject mq.send :ensures, :channel do mq.add_hook(:"channel.on_error") do yielded += 1 end mq.channel.exec_callback_once_yielding_self(:error, "channel close reason object") mq.stop end end # 2nd time yielded = 0 EM.run do mq.send :ensures, :channel do mq.channel.exec_callback_once_yielding_self(:error, "channel close reason object") mq.stop end end yielded.should == 1 end end describe "#subscribe" do subject { Tengine::Mq::Suite.new the_config } context "no block" do it { expect { subject.subscribe }.to raise_error(ArgumentError) } end context "no reactor, nowait: false" do it "raises" do block_called = false expect { subject.subscribe(:nowait => true) { block_called = true } }.to raise_error(RuntimeError) block_called.should be_false end end context "no reactor, nowait: true" do it "raises" do block_called = false expect { subject.subscribe(:nowait=>false) { block_called = true } }.to raise_error(RuntimeError) block_called.should be_false end end context "with reactor, with 1 message in queue" do it "runs the block" do block_called = false body = nil header = nil expect { EM.run do subject.subscribe do |hdr, bdy| block_called = true body = bdy header = hdr hdr.ack subject.stop end sender = Tengine::Event::Sender.new subject ev = Tengine::Event.new :event_type_name => "foo" subject.fire sender, ev, { :keep_connection => true }, nil end }.to_not raise_error(RuntimeError) block_called.should be_true header.should be_kind_of(AMQP::Header) body.should be_kind_of(String) body.should =~ /foo/ end end context "many messages in queue" do before do # キューにイベントがすでに溜まってるとおかしくなるので、吸い出しておく EM.run do i = 0 subject.subscribe do |hdr, bdy| hdr.ack i += 1 end EM.add_periodic_timer(0.1) do subject.stop if i.zero? i = 0 end end end it "runs the block every time" do block_called = 0 expect { EM.run do subject.subscribe do |hdr, bdy| block_called += 1 hdr.ack end sender = Tengine::Event::Sender.new subject EM.add_timer(0.1) { ev = Tengine::Event.new :event_type_name => "foo" subject.fire sender, ev, { :keep_connection => true }, nil EM.add_timer(0.1) { ev = Tengine::Event.new :event_type_name => "foo" subject.fire sender, ev, { :keep_connection => true }, nil EM.add_timer(0.1) { ev = Tengine::Event.new :event_type_name => "foo" subject.fire sender, ev, { :keep_connection => true }, nil EM.add_timer(0.3) { subject.stop } } } } end }.to_not raise_error(RuntimeError) block_called.should == 3 end end end describe "#unsubscribe" do subject { Tengine::Mq::Suite.new the_config } context "no block" do it { expect { subject.unsubscribe }.to raise_error(ArgumentError) } end context "no queue" do it "runs the block" do block_called = false expect { subject.unsubscribe { block_called = true subject.stop } }.to_not raise_error(RuntimeError) block_called.should be_true end end context "queue not subscribed" do it "runs the block" do block_called = false expect { EM.run do subject.send :ensures, :queue do subject.unsubscribe do block_called = true subject.stop end end end }.to_not raise_error(RuntimeError) block_called.should be_true end end context "subscribed" do it "runs the block" do block_called = false expect { EM.run do subject.subscribe :nowait => false, :confirm => lambda {|x| subject.unsubscribe {|y| block_called = true subject.stop } } do |hdr, bdy| hdr.ack end end }.to_not raise_error(RuntimeError) block_called.should be_true end end context "nowait:true" do it "runs the block" do block_called = false expect { EM.run do subject.subscribe :nowait => false, :confirm => lambda {|x| subject.unsubscribe({:nowait => true}) {|y| block_called = true subject.stop } } do |hdr, bdy| hdr.ack end end }.to_not raise_error(RuntimeError) block_called.should be_true end end end describe "#fire" do subject { Tengine::Mq::Suite.new the_config } def sender; Tengine::Event::Sender.new subject end def expected_event; Tengine::Event.new(:event_type_name => :foo, :key => "uniq_key") end context "without reactor" do it "raises" do block_called = false expect { ev = Tengine::Event.new :event_type_name => "foo" subject.fire sender, ev, { :keep_connection => true }, nil }.to raise_error(RuntimeError) block_called.should be_false end end context "with reactor" do after do # キューにイベントがたまるのでてきとうに吸い出す if @port EM.run do subject.send :ensures, :connection do i = 0 subject.subscribe do |hdr, bdy| hdr.ack i += 1 end @timer = EM.add_periodic_timer(0.1) do if i.zero? EM.cancel_timer @timer subject.unsubscribe do subject.stop end else i = 0 end end end end end end it "JSON形式にserializeしてexchangeにpublishする" do Thread.current[:expected_event] = expected_event EM.run do subject.send :ensures, :exchange do |xchg| def xchg.publish json, hash json.should == Thread.current[:expected_event].to_json super end subject.fire sender, Thread.current[:expected_event], {:keep_connection => false}, nil end end end it "Tengine::Eventオブジェクトを直接指定する" do # 上と同じ…過去には意味があった Thread.current[:expected_event] = expected_event EM.run do subject.send :ensures, :exchange do |xchg| def xchg.publish json, hash json.should == Thread.current[:expected_event].to_json super end subject.fire sender, Thread.current[:expected_event], {:keep_connection => false}, nil end end end context "publish後に特定の処理を行う" do it "カスタム処理" do block_called = false EM.run do sender.fire "foo" do block_called = true end end block_called.should be_true end end context "keep_connection: true" do it "do not stop the reactor" do block_called = false EM.run do sender.fire "foo", :keep_connection => true EM.add_timer(0.5) do block_called = true subject.stop end end block_called.should be_true end end context "keep_connection: false" do it "stop the reactor" do block_called = false EM.run do sender.fire "foo", :keep_connection => false EM.add_timer(1) do block_called = true subject.stop end end block_called.should be_false end end context "AMQP::TCPConnectionFailed 以外のエラー" do it "メッセージ送信ができなくてpublishに渡したブロックが呼び出されず、インターバルが過ぎて、EM.add_timeに渡したブロックが呼び出された場合" do block_called = false EM.run do subject.send :ensures, :exchange do |xchg| xchg.stub(:publish).with(expected_event.to_json, :content_type => "application/json", :persistent => true).exactly(31).times.and_raise(StandardError) subject.fire sender, expected_event, {:keep_connection => false, :retry_interval => 0}, lambda { block_called = true } subject.stop end end block_called.should_not be_true end it "エラーが発生しても設定のリトライが行われる" do EM.run do subject.send :ensures, :exchange do |xchg| # 正規のfireとリトライのfireなので、リトライ回数+1 xchg.stub(:publish).with(expected_event.to_json, :content_type => "application/json", :persistent => true).exactly(31).times.and_raise(StandardError) subject.fire sender, expected_event, {:keep_connection => false, :retry_interval => 0}, nil subject.stop end end end it "エラーが発生してもオプションで指定したリトライ回数分のリトライが行われる" do EM.run do subject.send :ensures, :exchange do |xchg| # 正規のfireとリトライのfireなので、リトライ回数+1 xchg.stub(:publish).with(expected_event.to_json, :content_type => "application/json", :persistent => true).exactly(2).times.and_raise(StandardError) subject.fire sender, expected_event, {:keep_connection => false, :retry_interval => 0, :retry_count => 1}, nil subject.stop end end end it "エラーが発生してもオプションで指定したリトライ間隔でリトライが行われる" do t0 = Time.now EM.run do subject.send :ensures, :exchange do |xchg| # 正規のfireとリトライのfireなので、リトライ回数+1 x = sender xchg.stub(:publish).with(expected_event.to_json, :content_type => "application/json", :persistent => true).exactly(3).times.and_raise(StandardError) subject.fire x, expected_event, {:keep_connection => false, :retry_interval => 1, :retry_count => 2}, nil subject.stop end end t1 = Time.now (t1 - t0).should be_within(1.0).of(2.0) end it "ちょうどretry_count回めのリトライして成功の場合は例外にならない" do block_called = false EM.run do subject.send :ensures, :exchange do |xchg| # 正規のfireとリトライのfireなので、リトライ回数+1 Thread.current[:expected_event] = expected_event Thread.current[:x] = false def xchg.publish str, hash if Thread.current[:x] = !Thread.current[:x] raise "foo" else super end end subject.fire sender, expected_event, {:keep_connection => false, :retry_interval => 0, :retry_count => 2}, lambda { block_called = true } subject.stop end end block_called.should be_true end end end context "複数のEM event loopにまたがったfire" do it "https://www.pivotaltracker.com/story/show/21252625" do EM.run do sender.fire("foo") end EM.run do sender.fire("foo") end # ここまでくればOK end end context "入り乱れたfireにおけるretryの回数" do it "https://www.pivotaltracker.com/story/show/20236589" do Thread.current[:n1] = 0 Thread.current[:n2] = 0 Thread.current[:ev1] = ev1 = Tengine::Event.new(:event_type_name => :foo, :key => "uniq_key") Thread.current[:ev2] = ev2 = Tengine::Event.new(:event_type_name => :foo, :key => "another_uniq_key") EM.run do subject.send :ensures, :exchange do |xchg| def xchg.publish json, hash case json when Thread.current[:ev1].to_json Thread.current[:n1] += 1 raise "ev1" when Thread.current[:ev2].to_json Thread.current[:n2] += 1 raise "ev2" end end subject.fire sender, ev1, {:keep_connection => true, :retry_interval => 0}, nil subject.fire sender, ev2, {:keep_connection => true, :retry_interval => 0}, nil subject.stop end end n1 = Thread.current[:n1] n2 = Thread.current[:n2] if n1 == 31 n2.should <= 31 n2.should >= 2 elsif n2 == 31 n1.should <= 31 n1.should >= 2 else raise "neither n1(#{n1}) nor n2(#{n2})" end end it "無限にメモリを消費しない" do n = 256 # 1024 # 4096 EM.run do subject.send :ensures, :exchange do |xchg| xchg.stub(:publish).with(an_instance_of(String), :content_type => "application/json", :persistent => true).exactly(31).times.and_raise(StandardError) n.times do EM.next_tick do ev = Tengine::Event.new(:event_type_name => :foo, :key => "uniq_key") subject.fire sender, ev, {:keep_connection => true, :retry_cont => 3, :retry_interval => 0}, nil end end EM.next_tick do subject.stop end end end GC.start subject.pending_events{true}.size.should < n end end end describe "#stop" do subject { Tengine::Mq::Suite.new the_config } let(:sender) { Tengine::Event::Sender.new subject } let(:expected_event) { Tengine::Event.new(:event_type_name => :foo, :key => "uniq_key") } after do # キューにイベントがたまるのでてきとうに吸い出す if @port EM.run do subject.send :ensures, :connection do i = 0 subject.subscribe do |hdr, bdy| hdr.ack i += 1 end EM.add_periodic_timer(0.1) do subject.stop if i.zero? i = 0 end end end end end it "EMのイベントループを抜ける" do EM.run do subject.stop end # ここに到達すればOK end it "ペンディングのイベントが送信されるまではEMのイベントループにとどまる" do block_called = false EM.run do subject.send :ensures, :exchange do |xchg| Thread.current[:expected_event] = expected_event Thread.current[:x] = false def xchg.publish str, hash if Thread.current[:x] = !Thread.current[:x] raise "foo" else super end end subject.fire sender, expected_event, {:keep_connection => false, :retry_interval => 0, :retry_count => 2}, lambda { block_called = true } subject.stop end end block_called.should be_true end end describe "#initiate_termination" do subject { Tengine::Mq::Suite.new the_config } after do # キューにイベントがたまるのでてきとうに吸い出す if @port EM.run do i = 0 j = false subject.subscribe :confirm=>proc{j = true} do |hdr, bdy| hdr.ack i += 1 end EM.add_periodic_timer(0.1) do if j subject.stop if i.zero? i = 0 end end end end end it "再接続しない" do block_called = false EM.run do subject.add_hook("connection.after_recovery") { block_called = true } subject.initiate_termination do EM.defer(proc { finish; trigger@port }, proc { subject.stop }) end end block_called.should_not be_true end end describe "#callback_entity" do subject { Tengine::Mq::Suite.new the_config } it "registers callbacks" do EM.run do subject.send :ensures, :queue do { :queue => [ :before_recovery, :after_recovery, :after_connection_interruption, ], :exchange => [ :before_recovery, :after_recovery, :after_connection_interruption, ], :channel => [ :before_recovery, :after_recovery, :after_connection_interruption, :error, ], :connection => [ :before_recovery, :after_recovery, :after_connection_interruption, :error, # :on_closed, # :on_possible_authentication_failure, # :on_tcp_connection_failure, # :on_tcp_connection_loss, ] }.each_pair do |klass, mids| obj = subject.send klass mids.each do |mid| case obj when AMQ::Client::Callbacks obj.should be_has_callback(mid) # 英語おかしいがしょうがない else # mockかも end end end subject.stop end end end end end context "実際にMQに接続する試験" do let(:rabbitmq) do ret = nil ENV["PATH"].split(/:/).find do |dir| Dir.glob("#{dir}/rabbitmq-server") do |path| if File.executable?(path) ret = path break end end end pending "these specs needs a rabbitmq installed" unless ret ret end def trigger port = rand(32768) raise "WRONG" if $_pid require 'tmpdir' $_dir = Dir.mktmpdir # 指定したポートはもう使われているかもしれないので、その際は # rabbitmqが起動に失敗するので、何回かポートを変えて試す。 n = 0 begin envp = { "RABBITMQ_NODENAME" => "rspec", "RABBITMQ_NODE_PORT" => port.to_s, "RABBITMQ_NODE_IP_ADDRESS" => "auto", "RABBITMQ_MNESIA_BASE" => $_dir.to_s, "RABBITMQ_LOG_BASE" => $_dir.to_s, } $_pid = Process.spawn(envp, rabbitmq, :chdir => $_dir, :in => :close) x = Time.now while Time.now < x + 16.0 do # まあこんくらい待てばいいでしょ sleep 0.1 Process.waitpid2($_pid, Process::WNOHANG) Process.kill 0, $_pid # netstat -an は Linux / BSD ともに有効 # どちらかに限ればもう少し効率的な探し方はある。たとえば Linux 限定でよければ netstat -lnt ... y = `netstat -an | fgrep LISTEN | fgrep #{port}` if y.lines.to_a.size >= 1 @port = port return end end pending "failed to invoke rabbitmq in 16 secs." rescue Errno::ECHILD, Errno::ESRCH if (n += 1) > 10 pending "10 attempts to invoke rabbitmq failed." else port = rand(32768) retry end end end def finish if $_pid begin Process.kill "INT", $_pid Process.waitpid $_pid rescue Errno::ECHILD, Errno::ESRCH ensure require 'fileutils' FileUtils.remove_entry_secure $_dir, :force end end $_pid = nil end before :all do pending "these specs needs a ruby 1.9.2" if RUBY_VERSION < "1.9.2" trigger end after :all do finish end let(:the_config) { { :connection => { :port => @port, }, } } it_should_behave_like "Tengine::Mq::Suite" end context "mock/stubによる試験" do def trigger *; end def finish end class Mocker class << self alias_method :[], :new end def initialize inspect @inspect = inspect end attr_reader :inspect end # for exchange class RSpec::Mocks::Mock def publish str, opt $the_messages.push str end end class RSpec::Mocks::MessageExpectation def and_emyield *val index = Object.new @emvals ||= Hash.new @emvals[index] = val def self.invoke_with_yield(&block) if block.nil? @error_generator.raise_missing_block_error @args_to_yield end value = nil @args_to_yield.each do |args_to_yield_this_time| if Array === args_to_yield_this_time[0] and @emvals.key?(args_to_yield_this_time[0][0]) value = EM.next_tick do block.yield(*args_to_yield_this_time[0][1]) end else if block.arity > -1 && args_to_yield_this_time.length != block.arity @error_generator.raise_wrong_arity_error args_to_yield_this_time, block.arity end value = eval_block(*args_to_yield_this_time, &block) end end value end and_yield([index, val]) end end before do $the_messages = EM::Queue.new @the_connection = mock(Mocker["connection"]) @the_channel_id = Numeric.new @the_channel = mock(Mocker["channel"]) @the_exchange = mock(Mocker["exchange"]) @the_queue = mock(Mocker["queue"]) @callbacks = Hash.new do |h, k| h[k] = Hash.new end AMQP.stub(:connect).with(an_instance_of(Hash)) do |h, block| EM.next_tick do block.yield @the_connection if h[:port] != 5672 @callbacks[@the_connection][:on_tcp_connection_failure].yield @the_connection if @callbacks[@the_connection][:on_tcp_connection_failure] raise AMQP::Error, "fake error." end end end AMQP::Channel.stub(:next_channel_id).and_return(@the_channel_id) AMQP::Channel.stub(:new).with(@the_connection, @the_channel_id, an_instance_of(Hash)).and_emyield(@the_channel).and_return(@the_channel) AMQP::Exchange.stub(:new).with(@the_channel, an_instance_of(Symbol), an_instance_of(String), an_instance_of(Hash)).and_emyield(@the_exchange).and_return(@the_exchange) @the_connection.stub(:connected?).and_return(true) @the_connection.stub(:disconnect) do |block| EM.next_tick do @callbacks[@the_connection][:on_closed].yield @the_connection if @callbacks[@the_connection][:on_closed] block.yield $the_messages = EM::Queue.new # reset end end @the_connection.stub(:server_capabilities).and_return(Hash.new) @the_connection.stub(:before_recovery) @the_connection.stub(:after_recovery) @the_connection.stub(:on_connection_interruption) @the_connection.stub(:on_closed) do |block| @callbacks[@the_connection][:on_closed] = block end @the_connection.stub(:on_possible_authentication_failure) @the_connection.stub(:on_tcp_connection_failure) do |block| @callbacks[@the_connection][:on_tcp_connection_failure] = block end @the_connection.stub(:on_tcp_connection_loss) @the_channel.stub(:queue).with(an_instance_of(String), an_instance_of(Hash)).and_emyield(@the_queue).and_return(@the_queue) @the_channel.stub(:close).and_emyield @the_channel.stub(:before_recovery) @the_channel.stub(:after_recovery) @the_channel.stub(:on_connection_interruption) @the_channel.stub(:on_error) do |block| @callbacks[@the_channel][:on_error] = block end @the_channel.stub(:exec_callback_once_yielding_self).with(:error, "channel close reason object") do EM.next_tick do @callbacks[@the_channel][:on_error].yield @the_channel if @callbacks[@the_channel][:on_error] end end @the_exchange.stub(:publish).with(an_instance_of(String), an_instance_of(Hash)) do |str, opt| $the_messages.push str end @the_exchange.stub(:before_recovery) @the_exchange.stub(:after_recovery) @the_exchange.stub(:on_connection_interruption) @the_queue.stub(:bind).with(@the_exchange, an_instance_of(Hash)).and_emyield @the_queue.stub(:subscribe).with(an_instance_of(Hash)) do |h, block| h[:confirm].call mock(Mocker["confirm-ok"]) if h[:confirm] and not h[:nowait] cb = lambda do |ev| header = AMQP::Header.new @the_channel, nil, Hash.new header.stub(:ack) block.yield header, ev $the_messages.pop(&cb) end EM.next_tick do $the_messages.pop(&cb) end end @the_queue.stub(:before_recovery) @the_queue.stub(:after_recovery) @the_queue.stub(:on_connection_interruption) @the_queue.stub(:default_consumer) end let(:the_config) { Hash.new } it_should_behave_like "Tengine::Mq::Suite" end end