spec/angelo/websocket_spec.rb in angelo-0.1.2 vs spec/angelo/websocket_spec.rb in angelo-0.1.3

- old
+ new

@@ -1,16 +1,34 @@ require_relative '../spec_helper' -CK = 'ANGELO_CONCURRENCY' # concurrency key -DC = 5 # default concurrency - describe Angelo::WebsocketResponder do - let(:concurrency){ ENV.key?(CK) ? ENV[CK].to_i : DC } + def socket_wait_for path, latch, expectation, key = :swf, &block + Reactor.testers[key] = Array.new(CONCURRENCY).map do + wsh = socket path + wsh.on_message = ->(e) { + expectation[e] if Proc === expectation + latch.count_down + } + wsh.init + wsh + end + action = (key.to_s + '_go').to_sym + Reactor.define_action action do |n| + every(0.01){ terminate if Reactor.stop? } + Reactor.testers[key][n].go + end + Reactor.unstop! + CONCURRENCY.times {|n| $reactor.async.__send__(action, n)} - def socket_wait_for path, &block - Array.new(concurrency).map {|n| Thread.new {socket path, &block}} + sleep 0.01 * CONCURRENCY + yield + + Reactor.testers[key].map &:close + Reactor.stop! + Reactor.testers.delete key + Reactor.remove_action action end describe 'basics' do define_app do @@ -20,29 +38,70 @@ end end end it 'responds on websockets properly' do - socket '/' do |client| - 5.times {|n| - client.send "hi there #{n}" - client.recv.should eq "hi there #{n}" + socket '/' do |wsh| + latch = CountDownLatch.new 500 + + wsh.on_message = ->(e) { + expect(e.data).to match(/hi there \d/) + latch.count_down } + + wsh.init + Reactor.testers[:tester] = wsh + Reactor.define_action :go do + every(0.01){ terminate if Reactor.stop? } + Reactor.testers[:tester].go + end + Reactor.unstop! + $reactor.async.go + + 500.times {|n| wsh.text "hi there #{n}"} + latch.wait + + Reactor.stop! + Reactor.testers.delete :tester + Reactor.remove_action :go end end it 'responds on multiple websockets properly' do - 5.times do - Thread.new do - socket '/' do |client| - 5.times {|n| - client.send "hi there #{n}" - client.recv.should eq "hi there #{n}" - } - end - end + latch = CountDownLatch.new CONCURRENCY * 500 + + Reactor.testers[:wshs] = Array.new(CONCURRENCY).map do + wsh = socket '/' + wsh.on_message = ->(e) { + expect(e.data).to match(/hi there \d/) + latch.count_down + } + wsh.init + wsh end + + Reactor.define_action :go do |n| + every(0.01){ terminate if Reactor.stop? } + Reactor.testers[:wshs][n].go + end + Reactor.unstop! + CONCURRENCY.times {|n| $reactor.async.go n} + + sleep 0.01 * CONCURRENCY + + ActorPool.define_action :go do |n| + 500.times {|x| Reactor.testers[:wshs][n].text "hi there #{x}"} + end + CONCURRENCY.times {|n| $pool.async.go n} + latch.wait + + Reactor.testers[:wshs].map &:close + Reactor.stop! + Reactor.testers.delete :wshs + Reactor.remove_action :go + + ActorPool.remove_action :go end end describe 'concurrency' do @@ -65,27 +124,28 @@ end it 'works with http requests' do - ts = socket_wait_for '/concur' do |client| - Angelo::HTTPABLE.each do |m| - client.recv.should eq "from http #{m}" - end - end + latch = CountDownLatch.new CONCURRENCY * Angelo::HTTPABLE.length - sleep 0.1 - Angelo::HTTPABLE.each {|m| __send__ m, '/concur', foo: 'http'} - ts.each &:join + expectation = ->(e){ + expect(e.data).to match(/from http (#{Angelo::HTTPABLE.map(&:to_s).join('|')})/) + } + socket_wait_for '/concur', latch, expectation do + Angelo::HTTPABLE.each {|m| __send__ m, '/concur', foo: 'http'} + latch.wait + end + end end describe 'helper contexts' do let(:obj){ {'foo' => 'bar'} } - let(:wait_for_block){ ->(client){ JSON.parse(client.recv).should eq obj}} + let(:wait_for_block){ ->(e){ expect(JSON.parse(e.data)).to eq(obj) }} define_app do post '/' do websockets.each {|ws| ws.write params.to_json} @@ -124,40 +184,82 @@ end end it 'handles single context' do - ts = socket_wait_for '/', &wait_for_block - sleep 0.1 - post '/', obj - ts.each &:join + latch = CountDownLatch.new CONCURRENCY + socket_wait_for '/', latch, wait_for_block do + post '/', obj + latch.wait + end end it 'handles multiple contexts' do - ts = socket_wait_for '/', &wait_for_block - one_ts = socket_wait_for '/one', &wait_for_block - other_ts = socket_wait_for '/other', &wait_for_block - sleep 0.1 - post '/one', obj + latch = CountDownLatch.new CONCURRENCY - ts.each {|t| t.should be_alive} - one_ts.each &:join - other_ts.each {|t| t.should be_alive} + Reactor.testers[:hmc] = Array.new(CONCURRENCY).map do + wsh = socket '/' + wsh.on_message = ->(e) { + wait_for_block[e] + latch.count_down + } + wsh.init + wsh + end - sleep 0.1 - post '/other', obj + one_latch = CountDownLatch.new CONCURRENCY - ts.each {|t| t.should be_alive} - one_ts.each {|t| t.should_not be_alive} - other_ts.each &:join + Reactor.testers[:hmc_one] = Array.new(CONCURRENCY).map do + wsh = socket '/one' + wsh.on_message = ->(e) { + wait_for_block[e] + one_latch.count_down + } + wsh.init + wsh + end - sleep 0.1 + other_latch = CountDownLatch.new CONCURRENCY + + Reactor.testers[:hmc_other] = Array.new(CONCURRENCY).map do + wsh = socket '/other' + wsh.on_message = ->(e) { + wait_for_block[e] + other_latch.count_down + } + wsh.init + wsh + end + + Reactor.define_action :go do |k, n| + Reactor.testers[k][n].go + end + Reactor.unstop! + CONCURRENCY.times do |n| + [:hmc, :hmc_one, :hmc_other].each do |k| + $reactor.async.go k, n + end + end + + sleep 0.01 * CONCURRENCY + + post '/one', obj + one_latch.wait post '/', obj + latch.wait + post '/other', obj + other_latch.wait - ts.each &:join - one_ts.each {|t| t.should_not be_alive} - other_ts.each {|t| t.should_not be_alive} + [:hmc, :hmc_one, :hmc_other].each do |k| + Reactor.testers[k].map &:close + end + Reactor.stop! + [:hmc, :hmc_one, :hmc_other].each do |k| + Reactor.testers.delete k + end + Reactor.remove_action :go end end + end