# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/plugin/input' require 'fluent/msgpack_factory' require 'yajl' require 'digest' require 'securerandom' module Fluent::Plugin class ForwardInput < Input Fluent::Plugin.register_input('forward', self) # See the wiki page below for protocol specification # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1 helpers :server LISTEN_PORT = 24224 desc 'The port to listen to.' config_param :port, :integer, default: LISTEN_PORT desc 'The bind address to listen to.' config_param :bind, :string, default: '0.0.0.0' config_param :backlog, :integer, default: nil # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src desc 'The timeout time used to set linger option.' config_param :linger_timeout, :integer, default: 0 # This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value. config_param :blocking_timeout, :time, default: 0.5 desc 'Try to resolve hostname from IP addresses or not.' config_param :resolve_hostname, :bool, default: nil desc 'Connections will be disconnected right after receiving first message if this value is true.' config_param :deny_keepalive, :bool, default: false desc 'Check the remote connection is still available by sending a keepalive packet if this value is true.' config_param :send_keepalive_packet, :bool, default: false desc 'Log warning if received chunk size is larger than this value.' config_param :chunk_size_warn_limit, :size, default: nil desc 'Received chunk is dropped if it is larger than this value.' config_param :chunk_size_limit, :size, default: nil desc 'Skip an event if incoming event is invalid.' config_param :skip_invalid_event, :bool, default: false desc "The field name of the client's source address." config_param :source_address_key, :string, default: nil desc "The field name of the client's hostname." config_param :source_hostname_key, :string, default: nil desc "New tag instead of incoming tag" config_param :tag, :string, default: nil desc "Add prefix to incoming tag" config_param :add_tag_prefix, :string, default: nil config_section :security, required: false, multi: false do desc 'The hostname' config_param :self_hostname, :string desc 'Shared key for authentication' config_param :shared_key, :string, secret: true desc 'If true, use user based authentication' config_param :user_auth, :bool, default: false desc 'Allow anonymous source. sections required if disabled.' config_param :allow_anonymous_source, :bool, default: true ### User based authentication config_section :user, param_name: :users, required: false, multi: true do desc 'The username for authentication' config_param :username, :string desc 'The password for authentication' config_param :password, :string, secret: true end ### Client ip/network authentication & per_host shared key config_section :client, param_name: :clients, required: false, multi: true do desc 'The IP address or host name of the client' config_param :host, :string, default: nil desc 'Network address specification' config_param :network, :string, default: nil desc 'Shared key per client' config_param :shared_key, :string, default: nil, secret: true desc 'Array of username.' config_param :users, :array, default: [] end end def configure(conf) super if @source_hostname_key # TODO: add test if @resolve_hostname.nil? @resolve_hostname = true elsif !@resolve_hostname # user specifies "false" in config raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key" end end @enable_field_injection = @source_address_key || @source_hostname_key raise Fluent::ConfigError, "'tag' parameter must not be empty" if @tag && @tag.empty? raise Fluent::ConfigError, "'add_tag_prefix' parameter must not be empty" if @add_tag_prefix && @add_tag_prefix.empty? if @security if @security.user_auth && @security.users.empty? raise Fluent::ConfigError, " sections required if user_auth enabled" end if !@security.allow_anonymous_source && @security.clients.empty? raise Fluent::ConfigError, " sections required if allow_anonymous_source disabled" end @nodes = [] @security.clients.each do |client| if client.host && client.network raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client" end if !client.host && !client.network raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client" end source = nil if client.host begin source = IPSocket.getaddress(client.host) rescue SocketError raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved" end end source_addr = begin IPAddr.new(source || client.network) rescue ArgumentError raise Fluent::ConfigError, "network '#{client.network}' address format is invalid" end @nodes.push({ address: source_addr, shared_key: (client.shared_key || @security.shared_key), users: client.users }) end end if @send_keepalive_packet && @deny_keepalive raise Fluent::ConfigError, "both 'send_keepalive_packet' and 'deny_keepalive' cannot be set to true" end end def multi_workers_ready? true end HEARTBEAT_UDP_PAYLOAD = "\0" def start super shared_socket = system_config.workers > 1 log.info "listening port", port: @port, bind: @bind server_create_connection( :in_forward_server, @port, bind: @bind, shared: shared_socket, resolve_name: @resolve_hostname, linger_timeout: @linger_timeout, send_keepalive_packet: @send_keepalive_packet, backlog: @backlog, &method(:handle_connection) ) server_create(:in_forward_server_udp_heartbeat, @port, shared: shared_socket, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock| log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data begin sock.write HEARTBEAT_UDP_PAYLOAD rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR log.trace "error while heartbeat response", host: sock.remote_host, error: e end end end def handle_connection(conn) send_data = ->(serializer, data){ conn.write serializer.call(data) } log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port state = :established nonce = nil user_auth_salt = nil if @security # security enabled session MUST use MessagePack as serialization format state = :helo nonce = generate_salt user_auth_salt = generate_salt send_data.call(:to_msgpack.to_proc, generate_helo(nonce, user_auth_salt)) state = :pingpong end log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port read_messages(conn) do |msg, chunk_size, serializer| case state when :pingpong success, reason_or_salt, shared_key = check_ping(msg, conn.remote_addr, user_auth_salt, nonce) unless success conn.on(:write_complete) { |c| c.close_after_write_complete } send_data.call(serializer, generate_pong(false, reason_or_salt, nonce, shared_key)) next end send_data.call(serializer, generate_pong(true, reason_or_salt, nonce, shared_key)) log.debug "connection established", address: conn.remote_addr, port: conn.remote_port state = :established when :established options = on_message(msg, chunk_size, conn) if options && r = response(options) log.trace "sent response to fluent socket", address: conn.remote_addr, response: r conn.on(:write_complete) { |c| c.close } if @deny_keepalive send_data.call(serializer, r) else if @deny_keepalive conn.close end end else raise "BUG: unknown session state: #{state}" end end end def read_messages(conn, &block) feeder = nil serializer = nil bytes = 0 conn.data do |data| # only for first call of callback unless feeder first = data[0] if first == '{' || first == '[' # json parser = Yajl::Parser.new parser.on_parse_complete = ->(obj){ block.call(obj, bytes, serializer) bytes = 0 } serializer = :to_json.to_proc feeder = ->(d){ parser << d } else # msgpack parser = Fluent::MessagePackFactory.msgpack_unpacker serializer = :to_msgpack.to_proc feeder = ->(d){ parser.feed_each(d){|obj| block.call(obj, bytes, serializer) bytes = 0 } } end end bytes += data.bytesize feeder.call(data) end end def response(option) if option && option['chunk'] return { 'ack' => option['chunk'] } end nil end def on_message(msg, chunk_size, conn) if msg.nil? # for future TCP heartbeat_request return end # TODO: raise an exception if broken chunk is generated by recoverable situation unless msg.is_a?(Array) log.warn "incoming chunk is broken:", host: conn.remote_host, msg: msg return end tag = msg[0] entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: conn.remote_host, limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: conn.remote_host, limit: @chunk_size_warn_limit, size: chunk_size end tag = @tag.dup if @tag tag = "#{@add_tag_prefix}.#{tag}" if @add_tag_prefix case entries when String # PackedForward option = msg[2] size = (option && option['size']) || 0 es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream es = es_class.new(entries, nil, size.to_i) es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event if @enable_field_injection es = add_source_info(es, conn) end router.emit_stream(tag, es) when Array # Forward es = if @skip_invalid_event check_and_skip_invalid_event(tag, entries, conn.remote_host) else es = Fluent::MultiEventStream.new entries.each { |e| record = e[1] next if record.nil? time = e[0] time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime es.add(time, record) } es end if @enable_field_injection es = add_source_info(es, conn) end router.emit_stream(tag, es) option = msg[2] else # Message time = msg[1] record = msg[2] if @skip_invalid_event && invalid_event?(tag, time, record) log.warn "got invalid event and drop it:", host: conn.remote_host, tag: tag, time: time, record: record return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? time = Fluent::Engine.now if time.to_i == 0 if @enable_field_injection record[@source_address_key] = conn.remote_addr if @source_address_key record[@source_hostname_key] = conn.remote_host if @source_hostname_key end router.emit(tag, time, record) option = msg[3] end # return option for response option end def invalid_event?(tag, time, record) !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String)) end def check_and_skip_invalid_event(tag, es, remote_host) new_es = Fluent::MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record next end new_es.add(time, record) } new_es end def add_source_info(es, conn) new_es = Fluent::MultiEventStream.new if @source_address_key && @source_hostname_key address = conn.remote_addr hostname = conn.remote_host es.each { |time, record| record[@source_address_key] = address record[@source_hostname_key] = hostname new_es.add(time, record) } elsif @source_address_key address = conn.remote_addr es.each { |time, record| record[@source_address_key] = address new_es.add(time, record) } elsif @source_hostname_key hostname = conn.remote_host es.each { |time, record| record[@source_hostname_key] = hostname new_es.add(time, record) } else raise "BUG: don't call this method in this case" end new_es end def select_authenticate_users(node, username) if node.nil? || node[:users].empty? @security.users.select{|u| u.username == username} else @security.users.select{|u| node[:users].include?(u.username) && u.username == username} end end def generate_salt ::SecureRandom.random_bytes(16) end def generate_helo(nonce, user_auth_salt) log.debug "generating helo" # ['HELO', options(hash)] ['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}] end def check_ping(message, remote_addr, user_auth_salt, nonce) log.debug "checking ping" # ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || ''] unless message.size == 6 && message[0] == 'PING' return false, 'invalid ping message' end _ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message node = @nodes.select{|n| n[:address].include?(remote_addr) rescue false }.first if !node && !@security.allow_anonymous_source log.warn "Anonymous client disallowed", address: remote_addr, hostname: hostname return false, "anonymous source host '#{remote_addr}' denied", nil end shared_key = node ? node[:shared_key] : @security.shared_key serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(nonce).update(shared_key).hexdigest if shared_key_hexdigest != serverside log.warn "Shared key mismatch", address: remote_addr, hostname: hostname return false, 'shared_key mismatch', nil end if @security.user_auth users = select_authenticate_users(node, username) success = false users.each do |user| passhash = Digest::SHA512.new.update(user_auth_salt).update(username).update(user[:password]).hexdigest success ||= (passhash == password_digest) end unless success log.warn "Authentication failed", address: remote_addr, hostname: hostname, username: username return false, 'username/password mismatch', nil end end return true, shared_key_salt, shared_key end def generate_pong(auth_result, reason_or_salt, nonce, shared_key) log.debug "generating pong" # ['PONG', bool(authentication result), 'reason if authentication failed', self_hostname, sha512_hex(salt + self_hostname + nonce + sharedkey)] unless auth_result return ['PONG', false, reason_or_salt, '', ''] end shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest ['PONG', true, '', @security.self_hostname, shared_key_digest_hex] end end end