lib/fluent/plugin/out_forward.rb in fluentd-1.1.1 vs lib/fluent/plugin/out_forward.rb in fluentd-1.1.2

- old
+ new

@@ -519,11 +519,10 @@ @port = server.port @weight = server.weight @standby = server.standby @failure = failure @available = true - @state = nil # @hostname is used for certificate verification & TLS SNI host_is_hostname = !(IPAddr.new(@host) rescue false) @hostname = case when host_is_hostname then @host @@ -535,11 +534,10 @@ @username = server.username @password = server.password @shared_key = server.shared_key || (sender.security && sender.security.shared_key) || "" @shared_key_salt = generate_salt - @shared_key_nonce = "" @unpacker = Fluent::Engine.msgpack_unpacker @resolved_host = nil @resolved_time = 0 @@ -550,10 +548,12 @@ attr_reader :name, :host, :port, :weight, :standby, :state attr_reader :sockaddr # used by on_heartbeat attr_reader :failure, :available # for test + RequestInfo = Struct.new(:state, :shared_key_nonce, :auth) + def validate_host_resolution! resolved_host end def available? @@ -566,28 +566,28 @@ def standby? @standby end - def establish_connection(sock) - while available? && @state != :established + def establish_connection(sock, ri) + while available? && ri.state != :established begin # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly. # We need rewrite around here using new socket/server plugin helper. buf = sock.read_nonblock(@sender.read_length) if buf.empty? sleep @sender.read_interval next end @unpacker.feed_each(buf) do |data| - on_read(sock, data) + on_read(sock, ri, data) end rescue IO::WaitReadable # If the exception is Errno::EWOULDBLOCK or Errno::EAGAIN, it is extended by IO::WaitReadable. # So IO::WaitReadable can be used to rescue the exceptions for retrying read_nonblock. # https//docs.ruby-lang.org/en/2.3.0/IO.html#method-i-read_nonblock - sleep @sender.read_interval unless @state == :established + sleep @sender.read_interval unless ri.state == :established rescue SystemCallError => e @log.warn "disconnected by error", host: @host, port: @port, error: e disable! break rescue EOFError @@ -597,13 +597,13 @@ end end end def send_data_actual(sock, tag, chunk) - @state = @sender.security ? :helo : :established - if @state != :established - establish_connection(sock) + ri = RequestInfo.new(@sender.security ? :helo : :established) + if ri.state != :established + establish_connection(sock, ri) end unless available? raise ConnectionClosedError, "failed to establish connection with node #{@name}" end @@ -754,43 +754,43 @@ def generate_salt SecureRandom.hex(16) end - def check_helo(message) + def check_helo(ri, message) @log.debug "checking helo" # ['HELO', options(hash)] unless message.size == 2 && message[0] == 'HELO' return false end opts = message[1] || {} # make shared_key_check failed (instead of error) if protocol version mismatch exist - @shared_key_nonce = opts['nonce'] || '' - @authentication = opts['auth'] || '' + ri.shared_key_nonce = opts['nonce'] || '' + ri.auth = opts['auth'] || '' true end - def generate_ping + def generate_ping(ri) @log.debug "generating ping" # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key), # username || '', sha512\_hex(auth\_salt + username + password) || ''] shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt) .update(@sender.security.self_hostname) - .update(@shared_key_nonce) + .update(ri.shared_key_nonce) .update(@shared_key) .hexdigest ping = ['PING', @sender.security.self_hostname, @shared_key_salt, shared_key_hexdigest] - if !@authentication.empty? - password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest + if !ri.auth.empty? + password_hexdigest = Digest::SHA512.new.update(ri.auth).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end ping end - def check_pong(message) + def check_pong(ri, message) @log.debug "checking pong" # ['PONG', bool(authentication result), 'reason if authentication failed', # self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)] unless message.size == 5 && message[0] == 'PONG' return false, 'invalid format for PONG message' @@ -803,40 +803,40 @@ if hostname == @sender.security.self_hostname return false, 'same hostname between input and output: invalid configuration' end - clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest + clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(ri.shared_key_nonce).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end return true, nil end - def on_read(sock, data) + def on_read(sock, ri, data) @log.trace __callee__ - case @state + case ri.state when :helo - unless check_helo(data) + unless check_helo(ri, data) @log.warn "received invalid helo message from #{@name}" disable! # shutdown return end - sock.write(generate_ping.to_msgpack) - @state = :pingpong + sock.write(generate_ping(ri).to_msgpack) + ri.state = :pingpong when :pingpong - succeeded, reason = check_pong(data) + succeeded, reason = check_pong(ri, data) unless succeeded @log.warn "connection refused to #{@name}: #{reason}" disable! # shutdown return end - @state = :established + ri.state = :established @log.debug "connection established", host: @host, port: @port else - raise "BUG: unknown session state: #{@state}" + raise "BUG: unknown session state: #{ri.state}" end end end # Override Node to disable heartbeat