lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.0.4 vs lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.1.0

- old
+ new

@@ -9,10 +9,14 @@ class Fluent::SecureForwardOutput::Node attr_accessor :host, :port, :hostlabel, :shared_key, :username, :password attr_accessor :authentication, :keepalive attr_accessor :socket, :sslsession, :unpacker, :shared_key_salt, :state + attr_accessor :first_session, :detach + + attr_reader :expire + def initialize(sender, shared_key, conf) @sender = sender @shared_key = shared_key @host = conf['host'] @@ -20,11 +24,15 @@ @hostlabel = conf['hostlabel'] || conf['host'] @username = conf['username'] || '' @password = conf['password'] || '' @authentication = nil + @keepalive = nil + @expire = nil + @first_session = false + @detach = false @socket = nil @sslsession = nil @unpacker = MessagePack::Unpacker.new @@ -63,14 +71,26 @@ end rescue => e $log.debug "error on node shutdown #{e.class}:#{e.message}" end + def join + @thread && @thread.join + end + def established? @state == :established end + def expired? + if @keepalive.nil? || @keepalive == 0 + false + else + @expire && @expire < Time.now + end + end + def generate_salt OpenSSL::Random.random_bytes(16) end def check_helo(message) @@ -79,11 +99,11 @@ unless message.size == 2 && message[0] == 'HELO' return false end opts = message[1] @authentication = opts['auth'] - @keepalive = opts['keepalive'] + @allow_keepalive = opts['keepalive'] true end def generate_ping $log.debug "generating ping" @@ -133,11 +153,10 @@ return end case @state when :helo - # TODO: log debug unless check_helo(data) $log.warn "received invalid helo message from #{@host}" self.shutdown return end @@ -148,12 +167,14 @@ unless success $log.warn "connection refused to #{@host}:" + reason self.shutdown return end - $log.info "connection established to #{@host}" + $log.info "connection established to #{@host}" if @first_session @state = :established + @expire = Time.now + @keepalive if @keepalive && @keepalive > 0 + $log.debug "connection established", :host => @host, :port => @port, :expire => @expire end end def connect $log.debug "starting client" @@ -207,9 +228,11 @@ read_length = @sender.read_length read_interval = @sender.read_interval socket_interval = @sender.socket_interval loop do + break if @detach + begin while @sslsession.read_nonblock(read_length, buf) if buf == '' sleep read_interval next