lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.2.2 vs lib/fluent/plugin/output_node.rb in fluent-plugin-secure-forward-0.2.3
- old
+ new
@@ -14,24 +14,27 @@
attr_accessor :first_session, :detach
attr_reader :expire
- def initialize(sender, shared_key, conf)
+ def initialize(sender, conf)
@sender = sender
- @shared_key = shared_key
+ @shared_key = conf.shared_key || sender.shared_key
- @host = conf['host']
- @port = (conf['port'] || Fluent::SecureForwardOutput::DEFAULT_SECURE_CONNECT_PORT).to_i
- @hostlabel = conf['hostlabel'] || conf['host']
- @username = conf['username'] || ''
- @password = conf['password'] || ''
- @standby = conf.has_key?('standby') && Fluent::Config.bool_value(conf['standby']) != false
+ @host = conf.host
+ @port = conf.port
+ @hostlabel = conf.hostlabel || conf.host
+ @username = conf.username
+ @password = conf.password
+ @standby = conf.standby
+ @keepalive = sender.keepalive
+
@authentication = nil
- @keepalive = nil
+ @writing = false
+
@expire = nil
@first_session = false
@detach = false
@socket = nil
@@ -48,23 +51,42 @@
end
def dup
renewed = self.class.new(
@sender,
- @shared_key,
- {'host' => @host, 'port' => @port, 'hostlabel' => @hostlabel, 'username' => @username, 'password' => @password}
+ Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby})
)
- renewed.keepalive = @keepalive if @keepalive
renewed
end
def start
@thread = Thread.new(&method(:connect))
## If you want to check code bug, turn this line enable
# @thread.abort_on_exception = true
end
+ def detach!
+ @detach = true
+ end
+
+ def detached?
+ @detach
+ end
+
+ def tain!
+ raise RuntimeError, "BUG: taining detached node" if @detach
+ @writing = true
+ end
+
+ def tained?
+ @writing
+ end
+
+ def release!
+ @writing = false
+ end
+
def shutdown
log.debug "shutting down node #{@host}"
@state = :closed
if @thread == Thread.current
@@ -270,9 +292,12 @@
sleep socket_interval
rescue EOFError
log.warn "disconnected from #{@host}"
break
end
+ end
+ while @writing
+ sleep read_interval
end
self.shutdown
end
end