lib/logstash/util/relp.rb in logstash-input-relp-0.1.5 vs lib/logstash/util/relp.rb in logstash-input-relp-2.0.1

- old
+ new

@@ -98,11 +98,11 @@ end class RelpServer < Relp - def initialize(host,port,required_commands=[]) + def initialize(host,port,required_commands=[],ssl_context=nil) @logger = Cabin::Channel.get(LogStash) @server=true #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) @@ -116,11 +116,14 @@ rescue Errno::EADDRINUSE @logger.error("Could not start RELP server: Address in use", :host => host, :port => port) raise end - @logger.info? and @logger.info("Started RELP Server", :host => host, :port => port) + if ssl_context + @server = OpenSSL::SSL::SSLServer.new(@server, ssl_context) + end + @logger.info("Started #{ssl_context ? 'SSL-enabled ' : ''}RELP Server", :host => host, :port => port) end def accept socket = @server.accept @logger.debug("New socket created") @@ -208,126 +211,8 @@ frame = Hash.new frame['txnr'] = txnr frame['command'] = 'rsp' frame['message'] = '200 OK' self.frame_write(socket, frame) - end - -end - -#This is only used by the tests; any problems here are not as important as elsewhere -class RelpClient < Relp - - def initialize(host,port,required_commands = [],buffer_size = 128, - retransmission_timeout=10) - @logger = Cabin::Channel.get(LogStash) - @logger.info? and @logger.info("Starting RELP client", :host => host, :port => port) - @server = false - @buffer = Hash.new - - @buffer_size = buffer_size - @retransmission_timeout = retransmission_timeout - - #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) - @basic_relp_commands = ['serverclose','rsp']#TODO: check for others - - #These are extra commands that we require, otherwise refuse the connection - @required_relp_commands = required_commands - - @socket=TCPSocket.new(host,port) - - #This'll start the automatic frame numbering - @lasttxnr = 0 - - offer=Hash.new - offer['command'] = 'open' - offer['message'] = 'relp_version=' + RelpVersion + "\n" - offer['message'] += 'relp_software=' + RelpSoftware + "\n" - offer['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: add optional ones - self.frame_write(@socket, offer) - response_frame = self.frame_read(@socket) - if response_frame['message'][0,3] != '200' - raise RelpError,response_frame['message'] - end - - response=Hash[*response_frame['message'][7..-1].scan(/^(.*)=(.*)$/).flatten] - if response['relp_version'].nil? - #if no version specified, relp spec says we must close connection - self.close() - raise RelpError, 'No relp_version specified; offer: ' - + response_frame['message'][6..-1].scan(/^(.*)=(.*)$/).flatten - - #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer - elsif ! (@required_relp_commands - response['commands'].split(',')).empty? - #if it can't receive syslog it's useless to us; close the connection - self.close() - raise InsufficientCommands, response['commands'] + ' offered, require ' - + @required_relp_commands.join(',') - end - #If we've got this far with no problems, we're good to go - @logger.info? and @logger.info("Connection establish with server") - - #This thread deals with responses that come back - reader = Thread.start do - loop do - f = self.frame_read(@socket) - if f['command'] == 'rsp' && f['message'] == '200 OK' - @buffer.delete(f['txnr']) - elsif f['command'] == 'rsp' && f['message'][0,1] == '5' - #TODO: What if we get an error for something we're already retransmitted due to timeout? - new_txnr = self.frame_write(@socket, @buffer[f['txnr']]) - @buffer[new_txnr] = @buffer[f['txnr']] - @buffer.delete(f['txnr']) - elsif f['command'] == 'serverclose' || f['txnr'] == @close_txnr - break - else - #Don't know what's going on if we get here, but it can't be good - raise RelpError#TODO: raising errors like this makes no sense - end - end - end - - #While this one deals with frames for which we get no reply - Thread.start do - old_buffer = Hash.new - loop do - #This returns old txnrs that are still present - (@buffer.keys & old_buffer.keys).each do |txnr| - new_txnr = self.frame_write(@socket, @buffer[txnr]) - @buffer[new_txnr] = @buffer[txnr] - @buffer.delete(txnr) - end - old_buffer = @buffer - sleep @retransmission_timeout - end - end - end - - #TODO: have a way to get back unacked messages on close - def close - frame = Hash.new - frame['command'] = 'close' - @close_txnr=self.frame_write(@socket, frame) - #TODO: ought to properly wait for a reply etc. The serverclose will make it work though - sleep @retransmission_timeout - @socket.close#TODO: shutdown? - return @buffer - end - - def syslog_write(logline) - - #If the buffer is already full, wait until a gap opens up - sleep 0.1 until @buffer.length<@buffer_size - - frame = Hash.new - frame['command'] = 'syslog' - frame['message'] = logline - - txnr = self.frame_write(@socket, frame) - @buffer[txnr] = frame - end - - def nexttxnr - @lasttxnr += 1 end end