spec/onstomp/full_stacks/test_broker.rb in onstomp-1.0.3 vs spec/onstomp/full_stacks/test_broker.rb in onstomp-1.0.4
- old
+ new
@@ -5,11 +5,11 @@
class TestBroker
class StopThread < StandardError; end
attr_reader :messages, :sessions, :mau_dib
attr_reader :frames_received, :frames_transmitted
- attr_accessor :session_class
+ attr_accessor :session_class, :accept_delay
def initialize(port=61613)
@frames_received = []
@frames_transmitted = []
@sessions = []
@@ -22,10 +22,11 @@
@socket = TCPServer.new @port
rescue Exception => ex
$stdout.puts "Could not bind: #{ex}"
end
@session_class = Session10
+ @accept_delay = nil
end
def kill_on_command command
@mau_dib = command
end
@@ -36,19 +37,17 @@
s.headers.each do |k,v|
msg.headers.append k, v
end
msg[:'message-id'] = "msg-#{Time.now.to_f}"
dest = msg[:destination]
- #$stdout.puts "Enqueuing on #{dest}"
if !@subscribes[dest].empty?
session, subid = @subscribes[dest].first
deliver_message msg, session, subid
else
@messages[dest] << msg
end
end
- #$stdout.puts "Done enqueueing!"
end
def deliver_message msg, sess, subid
msg[:subscription] = subid
sess.transmit msg
@@ -86,10 +85,11 @@
def start
@listener = Thread.new do
begin
loop do
sock = @socket.accept
+ @accept_delay && sleep(@accept_delay)
@session_mutex.synchronize do
@sessions << @session_class.new(self, sock)
end
end
rescue StopThread
@@ -131,10 +131,16 @@
end
def init_connection
@connection = OnStomp::Connections::Stomp_1_0.new(socket, self)
@processor = OnStomp::Components::ThreadedProcessor.new self
+ @killing = false
+ @session_killer = Thread.new do
+ Thread.pass until @killing
+ @socket.close rescue nil
+ @processor.stop rescue nil
+ end
end
def reply_to_connect connect_frame
connected_frame = nil
transmit OnStomp::Components::Frame.new('CONNECTED')
@@ -148,41 +154,39 @@
@connection.connected?
end
def init_events
on_subscribe do |s,_|
- #$stdout.puts "Got SUBSCRIBE: #{s.headers.to_hash.inspect}"
- @server.subscribe s, self
+ @server.subscribe(s, self) unless @killing
end
on_unsubscribe do |u, _|
- #$stdout.puts "Got UNSUBSCRIBE: #{u.headers.to_hash.inspect}"
- @server.unsubscribe u, self
+ @server.unsubscribe(u, self) unless @killing
end
on_send do |s,_|
- #$stdout.puts "Got a SEND frame! #{s.body.inspect}"
- @server.enqueue_message s
+ @server.enqueue_message(s) unless @killing
end
on_disconnect do |d,_|
- @connection.close
+ #@connection.close
end
after_transmitting do |f,_|
@server.frames_transmitted << f
end
before_receiving do |f,_|
- @server.frames_received << f
+ @server.frames_received << f unless @killing
if @server.mau_dib && f.command == @server.mau_dib
kill
+ elsif !@killing
+ if f.header? :receipt
+ transmit OnStomp::Components::Frame.new('RECEIPT',
+ :'receipt-id' => f[:receipt])
+ end
end
- if f.header? :receipt
- transmit OnStomp::Components::Frame.new('RECEIPT',
- :'receipt-id' => f[:receipt])
- end
end
end
def dispatch_transmitted f
trigger_after_transmitting f
@@ -201,16 +205,15 @@
end
def join
if @connection.connected?
#@connection.close
- @processor.join
+ @processor.join rescue nil
end
end
def kill
- @socket.close
- @processor.stop
+ @killing = true
end
def stop
if @connection.connected?
@connection.close