lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.0.4 vs lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.1.0
- old
+ new
@@ -9,10 +9,14 @@
class Fluent::SecureForwardOutput::Node
attr_accessor :host, :port, :hostlabel, :shared_key, :username, :password
attr_accessor :authentication, :keepalive
attr_accessor :socket, :sslsession, :unpacker, :shared_key_salt, :state
+ attr_accessor :first_session, :detach
+
+ attr_reader :expire
+
def initialize(sender, shared_key, conf)
@sender = sender
@shared_key = shared_key
@host = conf['host']
@@ -20,11 +24,15 @@
@hostlabel = conf['hostlabel'] || conf['host']
@username = conf['username'] || ''
@password = conf['password'] || ''
@authentication = nil
+
@keepalive = nil
+ @expire = nil
+ @first_session = false
+ @detach = false
@socket = nil
@sslsession = nil
@unpacker = MessagePack::Unpacker.new
@@ -63,14 +71,26 @@
end
rescue => e
$log.debug "error on node shutdown #{e.class}:#{e.message}"
end
+ def join
+ @thread && @thread.join
+ end
+
def established?
@state == :established
end
+ def expired?
+ if @keepalive.nil? || @keepalive == 0
+ false
+ else
+ @expire && @expire < Time.now
+ end
+ end
+
def generate_salt
OpenSSL::Random.random_bytes(16)
end
def check_helo(message)
@@ -79,11 +99,11 @@
unless message.size == 2 && message[0] == 'HELO'
return false
end
opts = message[1]
@authentication = opts['auth']
- @keepalive = opts['keepalive']
+ @allow_keepalive = opts['keepalive']
true
end
def generate_ping
$log.debug "generating ping"
@@ -133,11 +153,10 @@
return
end
case @state
when :helo
- # TODO: log debug
unless check_helo(data)
$log.warn "received invalid helo message from #{@host}"
self.shutdown
return
end
@@ -148,12 +167,14 @@
unless success
$log.warn "connection refused to #{@host}:" + reason
self.shutdown
return
end
- $log.info "connection established to #{@host}"
+ $log.info "connection established to #{@host}" if @first_session
@state = :established
+ @expire = Time.now + @keepalive if @keepalive && @keepalive > 0
+ $log.debug "connection established", :host => @host, :port => @port, :expire => @expire
end
end
def connect
$log.debug "starting client"
@@ -207,9 +228,11 @@
read_length = @sender.read_length
read_interval = @sender.read_interval
socket_interval = @sender.socket_interval
loop do
+ break if @detach
+
begin
while @sslsession.read_nonblock(read_length, buf)
if buf == ''
sleep read_interval
next