Sha256: d695c5eb9bcda7a414ee8b2b4842c51c3e55d88e02672967f7f7f3201eb5ddb1
Contents?: true
Size: 1.27 KB
Versions: 2
Compression:
Stored size: 1.27 KB
Contents
require 'spec_helper' require 'message_bus' describe MessageBus::ReliablePubSub do def new_bus MessageBus::ReliablePubSub.new(:db => 10) end def work_it bus = new_bus $stdout.reopen("/dev/null", "w") $stderr.reopen("/dev/null", "w") # subscribe blocks, so we need a new bus to transmit new_bus.subscribe("/echo", 0) do |msg| bus.publish("/response", Process.pid.to_s) end end def spawn_child r = fork if r.nil? work_it else r end end it 'gets every response from child processes' do Redis.new(:db => 10).flushdb begin pids = (1..10).map{spawn_child} responses = [] bus = MessageBus::ReliablePubSub.new(:db => 10) Thread.new do bus.subscribe("/response", 0) do |msg| responses << msg if pids.include? msg.data.to_i end end 10.times{bus.publish("/echo", Process.pid.to_s)} wait_for 4000 do responses.count == 100 end # p responses.group_by(&:data).map{|k,v|[k, v.count]} # p responses.group_by(&:global_id).map{|k,v|[k, v.count]} responses.count.should == 100 ensure if pids pids.each do |pid| Process.kill("KILL", pid) Process.wait(pid) end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
message_bus-1.0.11 | spec/lib/multi_process_spec.rb |
message_bus-1.0.10 | spec/lib/multi_process_spec.rb |