# encoding: utf-8 require 'spec_helper' require 'integration/eventmachine/spec_helper' include PlatformDetection if mri? require "multi_json" describe AMQ::Client::EventMachineClient, "Basic.Publish" do include EventedSpec::SpecHelper default_timeout 21.0 context "when messages are published across threads" do let(:inputs) do [ { :index=>{:_routing=>530,:_index=>"optimizer",:_type=>"earnings",:_id=>530}}, { :total_conversions=>0,:banked_clicks=>0,:total_earnings=>0,:pending_conversions=>0,:paid_net_earnings=>0,:banked_conversions=>0,:pending_earnings=>0,:optimizer_id=>530,:total_impressions=>0,:banked_earnings=>0,:bounce_count=>0,:time_on_page=>0,:total_clicks=>0,:entrances=>0,:pending_clicks=>0,:paid_earnings=>0}, { :index=>{:_routing=>430,:_index=>"optimizer",:_type=>"earnings",:_id=>430}}, { :total_conversions=>1443,:banked_clicks=>882,:total_earnings=>5796.3315841537,:pending_conversions=>22,:paid_net_earnings=>4116.90224486802,:banked_conversions=>1086,:pending_earnings=>257.502767857143,:optimizer_id=>430,:total_impressions=>6370497,:banked_earnings=>122.139339285714,:bounce_count=>6825,:time_on_page=>0,:total_clicks=>38143,:entrances=>12336,:pending_clicks=>1528,:paid_earnings=>5670.78224486798}, { :index=>{:_routing=>506,:_index=>"optimizer",:_type=>"earnings",:_id=>506}}, { :total_conversions=>237,:banked_clicks=>232,:total_earnings=>550.6212071428588277,:pending_conversions=>9,:paid_net_earnings=>388.021207142857,:banked_conversions=>225,:pending_earnings=>150.91,:optimizer_id=>506,:total_impressions=>348319,:banked_earnings=>12.92,:bounce_count=>905,:time_on_page=>0,:total_clicks=>4854,:entrances=>1614,:pending_clicks=>1034,:paid_earnings=>537.501207142858}, {:index=>{:_routing=>345,:_index=>"optimizer",:_type=>"earnings",:_id=>345}}, {:total_conversions=>0,:banked_clicks=>0,:total_earnings=>0,:pending_conversions=>0,:paid_net_earnings=>0,:banked_conversions=>0,:pending_earnings=>0,:optimizer_id=>345,:total_impressions=>0,:banked_earnings=>0,:bounce_count=>0,:time_on_page=>0,:total_clicks=>0,:entrances=>0,:pending_clicks=>0,:paid_earnings=>0} ] end let(:messages) { inputs.map {|i| MultiJson.encode(i) } * 3 } # the purpose of this is to make sure UNEXPECTED_FRAME issues are gone it "synchronizes on channel" do @received_messages = [] @received_messages = [] em_amqp_connect do |client| client.on_error do |conn, connection_close| fail "Handling a connection-level exception: #{connection_close.reply_text}" end channel = AMQ::Client::Channel.new(client, 1) channel.open do queue = AMQ::Client::Queue.new(client, channel).declare(false, false, false, true) queue.bind("amq.fanout") queue.consume(true) do |amq_method| queue.on_delivery do |method, header, payload| @received_messages << payload end exchange = AMQ::Client::Exchange.new(client, channel, "amq.fanout", :fanout) EventMachine.add_timer(2.0) do # ZOMG THREADS! 30.times do Thread.new do messages.each do |message| exchange.publish(message, queue.name, {}, false, false) end end end end end # let it run for several seconds because you know, concurrency issues do not always manifest themselves # immediately. MK. done(14.0) { # we don't care about the exact number, just the fact that there are # no UNEXPECTED_FRAME connection-level exceptions. MK. @received_messages.size.should > 120 } end end # em_amqp_connect end # it end # context end # describe end