# -*- coding: utf-8 -*-
require 'fluent/mixin/config_placeholders'
module Fluent
class SecureForwardOutput < ObjectBufferedOutput
end
end
require_relative 'output_node'
module Fluent
class SecureForwardOutput < ObjectBufferedOutput
DEFAULT_SECURE_CONNECT_PORT = 24284
Fluent::Plugin.register_output('secure_forward', self)
config_param :self_hostname, :string
include Fluent::Mixin::ConfigPlaceholders
config_param :shared_key, :string
config_param :keepalive, :time, :default => nil # nil/0 means disable keepalive
config_param :send_timeout, :time, :default => 60
# config_param :hard_timeout, :time, :default => 60
# config_param :expire_dns_cache, :time, :default => 0 # 0 means disable cache
config_param :allow_self_signed_certificate, :bool, :default => true
config_param :ca_file_path, :string, :default => nil
config_param :read_length, :size, :default => 512 # 512bytes
config_param :read_interval_msec, :integer, :default => 50 # 50ms
config_param :socket_interval_msec, :integer, :default => 200 # 200ms
config_param :reconnect_interval, :time, :default => 5
attr_reader :read_interval, :socket_interval
attr_reader :nodes
#
# host ipaddr/hostname
# hostlabel labelname # certification common name
# port 24284
# shared_key .... # optional shared key
# username name # if required
# password pass # if required
#
attr_reader :hostname_resolver
def initialize
super
require 'socket'
require 'openssl'
require 'digest'
require 'resolve/hostname'
end
def configure(conf)
super
unless @allow_self_signed_certificate
raise Fluent::ConfigError, "not tested yet!"
end
@read_interval = @read_interval_msec / 1000.0
@socket_interval = @socket_interval_msec / 1000.0
# read tags and set to nodes
@nodes = []
conf.elements.each do |element|
case element.name
when 'server'
unless element['host']
raise Fluent::ConfigError, "host missing in "
end
node_shared_key = element['shared_key'] || @shared_key
node = Node.new(self, node_shared_key, element)
node.first_session = true
node.keepalive = @keepalive
@nodes.push node
else
raise Fluent::ConfigError, "unknown config tag name #{element.name}"
end
end
@next_node = 0
@mutex = Mutex.new
@hostname_resolver = Resolve::Hostname.new(:system_resolver => true)
true
end
def select_node
tries = 0
nodes = @nodes.size
@mutex.synchronize {
n = nil
while tries <= nodes
n = @nodes[@next_node]
@next_node += 1
@next_node = 0 if @next_node >= nodes
return n if n && n.established?
tries += 1
end
nil
}
end
def start
super
$log.debug "starting secure-forward"
OpenSSL::Random.seed(File.read("/dev/urandom", 16))
$log.debug "start to connect target nodes"
@nodes.each do |node|
$log.debug "connecting node", :host => node.host, :port => node.port
node.start
end
@nodewatcher = Thread.new(&method(:node_watcher))
end
def node_watcher
loop do
sleep @reconnect_interval
$log.trace "in node health watcher"
(0...(@nodes.size)).each do |i|
$log.trace "node health watcher for #{@nodes[i].host}"
next if @nodes[i].established? && ! @nodes[i].expired?
$log.info "dead connection found: #{@nodes[i].host}, reconnecting..." unless @nodes[i].established?
node = @nodes[i]
$log.debug "reconnecting to node", :host => node.host, :port => node.port, :expire => node.expire, :expired => node.expired?
@nodes[i] = node.dup
@nodes[i].start
begin
node.shutdown
rescue => e
$log.warn "error in shutdown of dead connection", :error_class => e.class, :error => e
end
end
end
end
def shutdown
super
@nodewatcher.kill
@nodewatcher.join
@nodes.each do |node|
node.detach = true
node.join
end
end
def write_objects(tag, es)
#TODO: check errors
node = select_node
unless node
raise "no one nodes with valid ssl session"
end
begin
send_data(node, tag, es)
rescue IOError => e
$log.warn "Failed to send messages to #{node.host}, parging."
node.shutdown
end
end
# MessagePack FixArray length = 2
FORWARD_HEADER = [0x92].pack('C')
# to forward messages
def send_data(node, tag, es)
ssl = node.sslsession
# beginArray(2)
ssl.write FORWARD_HEADER
# writeRaw(tag)
ssl.write tag.to_msgpack
# beginRaw(size)
sz = es.size
# # FixRaw
# ssl.write [0xa0 | sz].pack('C')
#elsif sz < 65536
# # raw 16
# ssl.write [0xda, sz].pack('Cn')
#else
# raw 32
ssl.write [0xdb, sz].pack('CN')
#end
# writeRawBody(packed_es)
es.write_to(ssl)
end
end
end