# -*- encoding: utf-8 -*- # This is meant to provide basic testing of the full IO of an OnStomp client. # It is not designed to handle more than one client at a time. class TestBroker class StopThread < StandardError; end attr_reader :messages, :sessions def initialize(port=61613) @sessions = [] @messages = Hash.new { |h,k| h[k] = [] } @subscribes = Hash.new { |h,k| h[k] = [] } @sub_mutex = Mutex.new @session_mutex = Mutex.new @port = port begin @socket = TCPServer.new @port rescue Exception => ex $stdout.puts "Could not bind: #{ex}" end @session_class = Session10 end def kill_sessions @session_mutex.synchronize do @sessions.each do |s| s.kill end @sessions.clear end end def enqueue_message s @sub_mutex.synchronize do msg = OnStomp::Components::Frame.new 'MESSAGE', {}, s.body s.headers.each do |k,v| msg.headers.append k, v end msg[:'message-id'] = "msg-#{Time.now.to_f}" dest = msg[:destination] #$stdout.puts "Enqueuing on #{dest}" if !@subscribes[dest].empty? session, subid = @subscribes[dest].first deliver_message msg, session, subid else @messages[dest] << msg end end #$stdout.puts "Done enqueueing!" end def deliver_message msg, sess, subid msg[:subscription] = subid sess.transmit msg end def messages_for dest @messages[dest] end def bodies_for dest messages_for(dest).map { |m| m.body } end def subscribe f, session @sub_mutex.synchronize do #$stdout.puts "Subscribing?" dest = f[:destination] @subscribes[dest] << [session, f[:id]] #$stdout.puts "Any messages? #{@messages[dest].inspect}" until @messages[dest].empty? msg = @messages[dest].shift deliver_message msg, session, f[:id] end end end def unsubscribe f, session @sub_mutex.synchronize do @subscribes[f[:destination]].reject! do |pair| pair.first == session && pair.last == f[:id] end end end def start @listener = Thread.new do begin loop do sess = @session_class.new(self, @socket.accept) @session_mutex.synchronize do @sessions << sess end end rescue StopThread rescue Exception $stdout.puts "Listener failed: #{$!}" stop end end end def stop @session.stop if @session @listener.raise(StopThread.new) rescue nil @listener.join rescue nil @socket.close rescue nil end def join @sessions.each do |s| s.join end end class Session10 include OnStomp::Interfaces::ClientEvents attr_reader :connection, :socket def initialize server, sock @server = server @socket = sock init_events init_connection # CONNECT/CONNECTED handshake connect_frame = connected_frame = nil @connection.io_process_read do |f| connect_frame ||= f end until connect_frame transmit OnStomp::Components::Frame.new('CONNECTED') @connection.io_process_write do |f| connected_frame ||= f end until connected_frame @processor.start end def init_connection @connection = OnStomp::Connections::Stomp_1_0.new(socket, self) @processor = OnStomp::Components::ThreadedProcessor.new self end def connected? @connection.connected? end def init_events on_subscribe do |s,_| #$stdout.puts "Got SUBSCRIBE: #{s.headers.to_hash.inspect}" @server.subscribe s, self end on_unsubscribe do |u, _| #$stdout.puts "Got UNSUBSCRIBE: #{u.headers.to_hash.inspect}" @server.unsubscribe u, self end on_send do |s,_| #$stdout.puts "Got a SEND frame! #{s.body.inspect}" @server.enqueue_message s end on_disconnect do |d,_| @connection.close end before_receiving do |f,_| if f.header? :receipt transmit OnStomp::Components::Frame.new('RECEIPT', :'receipt-id' => f[:receipt]) end end end def dispatch_transmitted f trigger_after_transmitting f end def dispatch_received f trigger_before_receiving f trigger_after_receiving f end def transmit frame frame.tap do trigger_before_transmitting frame connection.write_frame_nonblock(frame) end end def join if @connection.connected? #@connection.close @processor.join end end def kill @socket.close @processor.stop end def stop if @connection.connected? @connection.close @processor.stop end end end class Session11 < Session10 def init_events super end def init_connection @connection = OnStomp::Connections::Stomp_1_1.new(socket, self) @processor = OnStomp::Components::ThreadedProcessor.new self end end class StompErrorOnConnectSession < Session10 end end class TestSSLBroker < TestBroker SSL_CERT_FILES = { :default => { :c => File.expand_path('../ssl/broker_cert.pem', __FILE__), :k => File.expand_path('../ssl/broker_key.pem', __FILE__) } } def initialize(port=61612, certs=:default) @port = port @tcp_socket = TCPServer.new(@port) @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE cert_files = SSL_CERT_FILES[certs] @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(cert_files[:k])) @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(cert_files[:c])) @socket = OpenSSL::SSL::SSLServer.new(@tcp_socket, @ssl_context) @socket.start_immediately = true @session = nil @version = '1.1' @session_class = StompSession @sent_frames = [] @received_frames = [] end end