lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.2.2 vs lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.2.3

- old
+ new

@@ -14,24 +14,27 @@ attr_accessor :first_session, :detach attr_reader :expire - def initialize(sender, shared_key, conf) + def initialize(sender, conf) @sender = sender - @shared_key = shared_key + @shared_key = conf.shared_key || sender.shared_key - @host = conf['host'] - @port = (conf['port'] || Fluent::SecureForwardOutput::DEFAULT_SECURE_CONNECT_PORT).to_i - @hostlabel = conf['hostlabel'] || conf['host'] - @username = conf['username'] || '' - @password = conf['password'] || '' - @standby = conf.has_key?('standby') && Fluent::Config.bool_value(conf['standby']) != false + @host = conf.host + @port = conf.port + @hostlabel = conf.hostlabel || conf.host + @username = conf.username + @password = conf.password + @standby = conf.standby + @keepalive = sender.keepalive + @authentication = nil - @keepalive = nil + @writing = false + @expire = nil @first_session = false @detach = false @socket = nil @@ -48,23 +51,42 @@ end def dup renewed = self.class.new( @sender, - @shared_key, - {'host' => @host, 'port' => @port, 'hostlabel' => @hostlabel, 'username' => @username, 'password' => @password} + Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby}) ) - renewed.keepalive = @keepalive if @keepalive renewed end def start @thread = Thread.new(&method(:connect)) ## If you want to check code bug, turn this line enable # @thread.abort_on_exception = true end + def detach! + @detach = true + end + + def detached? + @detach + end + + def tain! + raise RuntimeError, "BUG: taining detached node" if @detach + @writing = true + end + + def tained? + @writing + end + + def release! + @writing = false + end + def shutdown log.debug "shutting down node #{@host}" @state = :closed if @thread == Thread.current @@ -270,9 +292,12 @@ sleep socket_interval rescue EOFError log.warn "disconnected from #{@host}" break end + end + while @writing + sleep read_interval end self.shutdown end end