lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.3.4 vs lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.3.5pre1

- old
+ new

@@ -30,10 +30,11 @@ @standby = conf.standby @proxy_uri = conf.proxy_uri @keepalive = sender.keepalive + @connection_hard_timeout = sender.connection_hard_timeout @authentication = nil @writing = false @@ -45,10 +46,11 @@ @sslsession = nil @unpacker = MessagePack::Unpacker.new @shared_key_salt = generate_salt @state = :helo + @mtime = Time.now @thread = nil end def log @sender.log @@ -135,10 +137,11 @@ end opts = message[1] @shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist @authentication = opts['auth'] @allow_keepalive = opts['keepalive'] + @mtime = Time.now true end def generate_ping log.debug "generating ping" @@ -150,10 +153,11 @@ password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end + @mtime = Time.now ping end def check_pong(message) log.debug "checking pong" @@ -175,14 +179,16 @@ clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end + @mtime = Time.now return true, nil end def send_data(data) + @mtime = Time.now @sslsession.write data.to_msgpack end def on_read(data) log.debug "on_read" @@ -198,10 +204,11 @@ log.warn "received invalid helo message from #{@host}" self.shutdown return end send_data generate_ping() + @mtime = Time.now @state = :pingpong when :pingpong success, reason = check_pong(data) unless success log.warn "connection refused to #{@host}:" + reason @@ -209,10 +216,11 @@ return end log.info "connection established to #{@host}" if @first_session @state = :established @expire = Time.now + @keepalive if @keepalive && @keepalive > 0 + @mtime = Time.now log.debug "connection established", host: @host, port: @port, expire: @expire end end def connect @@ -280,10 +288,11 @@ log.debug "trying to connect ssl session", host: @host, address: addr, port: @port begin sslsession = OpenSSL::SSL::SSLSocket.new(sock, context) log.trace "connecting...", host: @host, address: addr, port: @port sslsession.connect + @mtime = Time.now rescue => e log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port @state = :failed return end @@ -315,24 +324,37 @@ buf = '' read_length = @sender.read_length read_interval = @sender.read_interval socket_interval = @sender.socket_interval + @mtime = Time.now + loop do break if @detach + break if Time.now > @mtime + @connection_hard_timeout begin while @sslsession.read_nonblock(read_length, buf) if buf == '' sleep read_interval next end @unpacker.feed_each(buf, &method(:on_read)) + @mtime = Time.now buf = '' end - rescue OpenSSL::SSL::SSLError + rescue OpenSSL::SSL::SSLError => e # to wait i/o restart - sleep socket_interval + log.trace "SSLError", error_class: e.class, error: e, mtime: @mtime, host: @host, port: @port + if Time.now > @mtime + @connection_hard_timeout + log.warn "connection hard timeout", mtime: @mtime, timeout: @connection_hard_timeout, host: @host, port: @port + log.warn "aborting connection", host: @host, port: @port + self.release! + self.detach! + break + else + sleep socket_interval + end rescue SystemCallError => e log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port self.release! self.detach! break