spec/onstomp/full_stacks/test_broker.rb in onstomp-1.0.3 vs spec/onstomp/full_stacks/test_broker.rb in onstomp-1.0.4

- old
+ new

@@ -5,11 +5,11 @@ class TestBroker class StopThread < StandardError; end attr_reader :messages, :sessions, :mau_dib attr_reader :frames_received, :frames_transmitted - attr_accessor :session_class + attr_accessor :session_class, :accept_delay def initialize(port=61613) @frames_received = [] @frames_transmitted = [] @sessions = [] @@ -22,10 +22,11 @@ @socket = TCPServer.new @port rescue Exception => ex $stdout.puts "Could not bind: #{ex}" end @session_class = Session10 + @accept_delay = nil end def kill_on_command command @mau_dib = command end @@ -36,19 +37,17 @@ s.headers.each do |k,v| msg.headers.append k, v end msg[:'message-id'] = "msg-#{Time.now.to_f}" dest = msg[:destination] - #$stdout.puts "Enqueuing on #{dest}" if !@subscribes[dest].empty? session, subid = @subscribes[dest].first deliver_message msg, session, subid else @messages[dest] << msg end end - #$stdout.puts "Done enqueueing!" end def deliver_message msg, sess, subid msg[:subscription] = subid sess.transmit msg @@ -86,10 +85,11 @@ def start @listener = Thread.new do begin loop do sock = @socket.accept + @accept_delay && sleep(@accept_delay) @session_mutex.synchronize do @sessions << @session_class.new(self, sock) end end rescue StopThread @@ -131,10 +131,16 @@ end def init_connection @connection = OnStomp::Connections::Stomp_1_0.new(socket, self) @processor = OnStomp::Components::ThreadedProcessor.new self + @killing = false + @session_killer = Thread.new do + Thread.pass until @killing + @socket.close rescue nil + @processor.stop rescue nil + end end def reply_to_connect connect_frame connected_frame = nil transmit OnStomp::Components::Frame.new('CONNECTED') @@ -148,41 +154,39 @@ @connection.connected? end def init_events on_subscribe do |s,_| - #$stdout.puts "Got SUBSCRIBE: #{s.headers.to_hash.inspect}" - @server.subscribe s, self + @server.subscribe(s, self) unless @killing end on_unsubscribe do |u, _| - #$stdout.puts "Got UNSUBSCRIBE: #{u.headers.to_hash.inspect}" - @server.unsubscribe u, self + @server.unsubscribe(u, self) unless @killing end on_send do |s,_| - #$stdout.puts "Got a SEND frame! #{s.body.inspect}" - @server.enqueue_message s + @server.enqueue_message(s) unless @killing end on_disconnect do |d,_| - @connection.close + #@connection.close end after_transmitting do |f,_| @server.frames_transmitted << f end before_receiving do |f,_| - @server.frames_received << f + @server.frames_received << f unless @killing if @server.mau_dib && f.command == @server.mau_dib kill + elsif !@killing + if f.header? :receipt + transmit OnStomp::Components::Frame.new('RECEIPT', + :'receipt-id' => f[:receipt]) + end end - if f.header? :receipt - transmit OnStomp::Components::Frame.new('RECEIPT', - :'receipt-id' => f[:receipt]) - end end end def dispatch_transmitted f trigger_after_transmitting f @@ -201,16 +205,15 @@ end def join if @connection.connected? #@connection.close - @processor.join + @processor.join rescue nil end end def kill - @socket.close - @processor.stop + @killing = true end def stop if @connection.connected? @connection.close