lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.3.4 vs lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.3.5pre1
- old
+ new
@@ -30,10 +30,11 @@
@standby = conf.standby
@proxy_uri = conf.proxy_uri
@keepalive = sender.keepalive
+ @connection_hard_timeout = sender.connection_hard_timeout
@authentication = nil
@writing = false
@@ -45,10 +46,11 @@
@sslsession = nil
@unpacker = MessagePack::Unpacker.new
@shared_key_salt = generate_salt
@state = :helo
+ @mtime = Time.now
@thread = nil
end
def log
@sender.log
@@ -135,10 +137,11 @@
end
opts = message[1]
@shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist
@authentication = opts['auth']
@allow_keepalive = opts['keepalive']
+ @mtime = Time.now
true
end
def generate_ping
log.debug "generating ping"
@@ -150,10 +153,11 @@
password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
ping.push(@username, password_hexdigest)
else
ping.push('','')
end
+ @mtime = Time.now
ping
end
def check_pong(message)
log.debug "checking pong"
@@ -175,14 +179,16 @@
clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
unless shared_key_hexdigest == clientside
return false, 'shared key mismatch'
end
+ @mtime = Time.now
return true, nil
end
def send_data(data)
+ @mtime = Time.now
@sslsession.write data.to_msgpack
end
def on_read(data)
log.debug "on_read"
@@ -198,10 +204,11 @@
log.warn "received invalid helo message from #{@host}"
self.shutdown
return
end
send_data generate_ping()
+ @mtime = Time.now
@state = :pingpong
when :pingpong
success, reason = check_pong(data)
unless success
log.warn "connection refused to #{@host}:" + reason
@@ -209,10 +216,11 @@
return
end
log.info "connection established to #{@host}" if @first_session
@state = :established
@expire = Time.now + @keepalive if @keepalive && @keepalive > 0
+ @mtime = Time.now
log.debug "connection established", host: @host, port: @port, expire: @expire
end
end
def connect
@@ -280,10 +288,11 @@
log.debug "trying to connect ssl session", host: @host, address: addr, port: @port
begin
sslsession = OpenSSL::SSL::SSLSocket.new(sock, context)
log.trace "connecting...", host: @host, address: addr, port: @port
sslsession.connect
+ @mtime = Time.now
rescue => e
log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port
@state = :failed
return
end
@@ -315,24 +324,37 @@
buf = ''
read_length = @sender.read_length
read_interval = @sender.read_interval
socket_interval = @sender.socket_interval
+ @mtime = Time.now
+
loop do
break if @detach
+ break if Time.now > @mtime + @connection_hard_timeout
begin
while @sslsession.read_nonblock(read_length, buf)
if buf == ''
sleep read_interval
next
end
@unpacker.feed_each(buf, &method(:on_read))
+ @mtime = Time.now
buf = ''
end
- rescue OpenSSL::SSL::SSLError
+ rescue OpenSSL::SSL::SSLError => e
# to wait i/o restart
- sleep socket_interval
+ log.trace "SSLError", error_class: e.class, error: e, mtime: @mtime, host: @host, port: @port
+ if Time.now > @mtime + @connection_hard_timeout
+ log.warn "connection hard timeout", mtime: @mtime, timeout: @connection_hard_timeout, host: @host, port: @port
+ log.warn "aborting connection", host: @host, port: @port
+ self.release!
+ self.detach!
+ break
+ else
+ sleep socket_interval
+ end
rescue SystemCallError => e
log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port
self.release!
self.detach!
break