spec/angelo/websocket_spec.rb in angelo-0.1.2 vs spec/angelo/websocket_spec.rb in angelo-0.1.3
- old
+ new
@@ -1,16 +1,34 @@
require_relative '../spec_helper'
-CK = 'ANGELO_CONCURRENCY' # concurrency key
-DC = 5 # default concurrency
-
describe Angelo::WebsocketResponder do
- let(:concurrency){ ENV.key?(CK) ? ENV[CK].to_i : DC }
+ def socket_wait_for path, latch, expectation, key = :swf, &block
+ Reactor.testers[key] = Array.new(CONCURRENCY).map do
+ wsh = socket path
+ wsh.on_message = ->(e) {
+ expectation[e] if Proc === expectation
+ latch.count_down
+ }
+ wsh.init
+ wsh
+ end
+ action = (key.to_s + '_go').to_sym
+ Reactor.define_action action do |n|
+ every(0.01){ terminate if Reactor.stop? }
+ Reactor.testers[key][n].go
+ end
+ Reactor.unstop!
+ CONCURRENCY.times {|n| $reactor.async.__send__(action, n)}
- def socket_wait_for path, &block
- Array.new(concurrency).map {|n| Thread.new {socket path, &block}}
+ sleep 0.01 * CONCURRENCY
+ yield
+
+ Reactor.testers[key].map &:close
+ Reactor.stop!
+ Reactor.testers.delete key
+ Reactor.remove_action action
end
describe 'basics' do
define_app do
@@ -20,29 +38,70 @@
end
end
end
it 'responds on websockets properly' do
- socket '/' do |client|
- 5.times {|n|
- client.send "hi there #{n}"
- client.recv.should eq "hi there #{n}"
+ socket '/' do |wsh|
+ latch = CountDownLatch.new 500
+
+ wsh.on_message = ->(e) {
+ expect(e.data).to match(/hi there \d/)
+ latch.count_down
}
+
+ wsh.init
+ Reactor.testers[:tester] = wsh
+ Reactor.define_action :go do
+ every(0.01){ terminate if Reactor.stop? }
+ Reactor.testers[:tester].go
+ end
+ Reactor.unstop!
+ $reactor.async.go
+
+ 500.times {|n| wsh.text "hi there #{n}"}
+ latch.wait
+
+ Reactor.stop!
+ Reactor.testers.delete :tester
+ Reactor.remove_action :go
end
end
it 'responds on multiple websockets properly' do
- 5.times do
- Thread.new do
- socket '/' do |client|
- 5.times {|n|
- client.send "hi there #{n}"
- client.recv.should eq "hi there #{n}"
- }
- end
- end
+ latch = CountDownLatch.new CONCURRENCY * 500
+
+ Reactor.testers[:wshs] = Array.new(CONCURRENCY).map do
+ wsh = socket '/'
+ wsh.on_message = ->(e) {
+ expect(e.data).to match(/hi there \d/)
+ latch.count_down
+ }
+ wsh.init
+ wsh
end
+
+ Reactor.define_action :go do |n|
+ every(0.01){ terminate if Reactor.stop? }
+ Reactor.testers[:wshs][n].go
+ end
+ Reactor.unstop!
+ CONCURRENCY.times {|n| $reactor.async.go n}
+
+ sleep 0.01 * CONCURRENCY
+
+ ActorPool.define_action :go do |n|
+ 500.times {|x| Reactor.testers[:wshs][n].text "hi there #{x}"}
+ end
+ CONCURRENCY.times {|n| $pool.async.go n}
+ latch.wait
+
+ Reactor.testers[:wshs].map &:close
+ Reactor.stop!
+ Reactor.testers.delete :wshs
+ Reactor.remove_action :go
+
+ ActorPool.remove_action :go
end
end
describe 'concurrency' do
@@ -65,27 +124,28 @@
end
it 'works with http requests' do
- ts = socket_wait_for '/concur' do |client|
- Angelo::HTTPABLE.each do |m|
- client.recv.should eq "from http #{m}"
- end
- end
+ latch = CountDownLatch.new CONCURRENCY * Angelo::HTTPABLE.length
- sleep 0.1
- Angelo::HTTPABLE.each {|m| __send__ m, '/concur', foo: 'http'}
- ts.each &:join
+ expectation = ->(e){
+ expect(e.data).to match(/from http (#{Angelo::HTTPABLE.map(&:to_s).join('|')})/)
+ }
+ socket_wait_for '/concur', latch, expectation do
+ Angelo::HTTPABLE.each {|m| __send__ m, '/concur', foo: 'http'}
+ latch.wait
+ end
+
end
end
describe 'helper contexts' do
let(:obj){ {'foo' => 'bar'} }
- let(:wait_for_block){ ->(client){ JSON.parse(client.recv).should eq obj}}
+ let(:wait_for_block){ ->(e){ expect(JSON.parse(e.data)).to eq(obj) }}
define_app do
post '/' do
websockets.each {|ws| ws.write params.to_json}
@@ -124,40 +184,82 @@
end
end
it 'handles single context' do
- ts = socket_wait_for '/', &wait_for_block
- sleep 0.1
- post '/', obj
- ts.each &:join
+ latch = CountDownLatch.new CONCURRENCY
+ socket_wait_for '/', latch, wait_for_block do
+ post '/', obj
+ latch.wait
+ end
end
it 'handles multiple contexts' do
- ts = socket_wait_for '/', &wait_for_block
- one_ts = socket_wait_for '/one', &wait_for_block
- other_ts = socket_wait_for '/other', &wait_for_block
- sleep 0.1
- post '/one', obj
+ latch = CountDownLatch.new CONCURRENCY
- ts.each {|t| t.should be_alive}
- one_ts.each &:join
- other_ts.each {|t| t.should be_alive}
+ Reactor.testers[:hmc] = Array.new(CONCURRENCY).map do
+ wsh = socket '/'
+ wsh.on_message = ->(e) {
+ wait_for_block[e]
+ latch.count_down
+ }
+ wsh.init
+ wsh
+ end
- sleep 0.1
- post '/other', obj
+ one_latch = CountDownLatch.new CONCURRENCY
- ts.each {|t| t.should be_alive}
- one_ts.each {|t| t.should_not be_alive}
- other_ts.each &:join
+ Reactor.testers[:hmc_one] = Array.new(CONCURRENCY).map do
+ wsh = socket '/one'
+ wsh.on_message = ->(e) {
+ wait_for_block[e]
+ one_latch.count_down
+ }
+ wsh.init
+ wsh
+ end
- sleep 0.1
+ other_latch = CountDownLatch.new CONCURRENCY
+
+ Reactor.testers[:hmc_other] = Array.new(CONCURRENCY).map do
+ wsh = socket '/other'
+ wsh.on_message = ->(e) {
+ wait_for_block[e]
+ other_latch.count_down
+ }
+ wsh.init
+ wsh
+ end
+
+ Reactor.define_action :go do |k, n|
+ Reactor.testers[k][n].go
+ end
+ Reactor.unstop!
+ CONCURRENCY.times do |n|
+ [:hmc, :hmc_one, :hmc_other].each do |k|
+ $reactor.async.go k, n
+ end
+ end
+
+ sleep 0.01 * CONCURRENCY
+
+ post '/one', obj
+ one_latch.wait
post '/', obj
+ latch.wait
+ post '/other', obj
+ other_latch.wait
- ts.each &:join
- one_ts.each {|t| t.should_not be_alive}
- other_ts.each {|t| t.should_not be_alive}
+ [:hmc, :hmc_one, :hmc_other].each do |k|
+ Reactor.testers[k].map &:close
+ end
+ Reactor.stop!
+ [:hmc, :hmc_one, :hmc_other].each do |k|
+ Reactor.testers.delete k
+ end
+ Reactor.remove_action :go
end
end
+
end