lib/fluent/plugin/out_forward.rb in fluentd-1.1.1 vs lib/fluent/plugin/out_forward.rb in fluentd-1.1.2
- old
+ new
@@ -519,11 +519,10 @@
@port = server.port
@weight = server.weight
@standby = server.standby
@failure = failure
@available = true
- @state = nil
# @hostname is used for certificate verification & TLS SNI
host_is_hostname = !(IPAddr.new(@host) rescue false)
@hostname = case
when host_is_hostname then @host
@@ -535,11 +534,10 @@
@username = server.username
@password = server.password
@shared_key = server.shared_key || (sender.security && sender.security.shared_key) || ""
@shared_key_salt = generate_salt
- @shared_key_nonce = ""
@unpacker = Fluent::Engine.msgpack_unpacker
@resolved_host = nil
@resolved_time = 0
@@ -550,10 +548,12 @@
attr_reader :name, :host, :port, :weight, :standby, :state
attr_reader :sockaddr # used by on_heartbeat
attr_reader :failure, :available # for test
+ RequestInfo = Struct.new(:state, :shared_key_nonce, :auth)
+
def validate_host_resolution!
resolved_host
end
def available?
@@ -566,28 +566,28 @@
def standby?
@standby
end
- def establish_connection(sock)
- while available? && @state != :established
+ def establish_connection(sock, ri)
+ while available? && ri.state != :established
begin
# TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
# We need rewrite around here using new socket/server plugin helper.
buf = sock.read_nonblock(@sender.read_length)
if buf.empty?
sleep @sender.read_interval
next
end
@unpacker.feed_each(buf) do |data|
- on_read(sock, data)
+ on_read(sock, ri, data)
end
rescue IO::WaitReadable
# If the exception is Errno::EWOULDBLOCK or Errno::EAGAIN, it is extended by IO::WaitReadable.
# So IO::WaitReadable can be used to rescue the exceptions for retrying read_nonblock.
# https//docs.ruby-lang.org/en/2.3.0/IO.html#method-i-read_nonblock
- sleep @sender.read_interval unless @state == :established
+ sleep @sender.read_interval unless ri.state == :established
rescue SystemCallError => e
@log.warn "disconnected by error", host: @host, port: @port, error: e
disable!
break
rescue EOFError
@@ -597,13 +597,13 @@
end
end
end
def send_data_actual(sock, tag, chunk)
- @state = @sender.security ? :helo : :established
- if @state != :established
- establish_connection(sock)
+ ri = RequestInfo.new(@sender.security ? :helo : :established)
+ if ri.state != :established
+ establish_connection(sock, ri)
end
unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
@@ -754,43 +754,43 @@
def generate_salt
SecureRandom.hex(16)
end
- def check_helo(message)
+ def check_helo(ri, message)
@log.debug "checking helo"
# ['HELO', options(hash)]
unless message.size == 2 && message[0] == 'HELO'
return false
end
opts = message[1] || {}
# make shared_key_check failed (instead of error) if protocol version mismatch exist
- @shared_key_nonce = opts['nonce'] || ''
- @authentication = opts['auth'] || ''
+ ri.shared_key_nonce = opts['nonce'] || ''
+ ri.auth = opts['auth'] || ''
true
end
- def generate_ping
+ def generate_ping(ri)
@log.debug "generating ping"
# ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key),
# username || '', sha512\_hex(auth\_salt + username + password) || '']
shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt)
.update(@sender.security.self_hostname)
- .update(@shared_key_nonce)
+ .update(ri.shared_key_nonce)
.update(@shared_key)
.hexdigest
ping = ['PING', @sender.security.self_hostname, @shared_key_salt, shared_key_hexdigest]
- if !@authentication.empty?
- password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
+ if !ri.auth.empty?
+ password_hexdigest = Digest::SHA512.new.update(ri.auth).update(@username).update(@password).hexdigest
ping.push(@username, password_hexdigest)
else
ping.push('','')
end
ping
end
- def check_pong(message)
+ def check_pong(ri, message)
@log.debug "checking pong"
# ['PONG', bool(authentication result), 'reason if authentication failed',
# self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
unless message.size == 5 && message[0] == 'PONG'
return false, 'invalid format for PONG message'
@@ -803,40 +803,40 @@
if hostname == @sender.security.self_hostname
return false, 'same hostname between input and output: invalid configuration'
end
- clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
+ clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(ri.shared_key_nonce).update(@shared_key).hexdigest
unless shared_key_hexdigest == clientside
return false, 'shared key mismatch'
end
return true, nil
end
- def on_read(sock, data)
+ def on_read(sock, ri, data)
@log.trace __callee__
- case @state
+ case ri.state
when :helo
- unless check_helo(data)
+ unless check_helo(ri, data)
@log.warn "received invalid helo message from #{@name}"
disable! # shutdown
return
end
- sock.write(generate_ping.to_msgpack)
- @state = :pingpong
+ sock.write(generate_ping(ri).to_msgpack)
+ ri.state = :pingpong
when :pingpong
- succeeded, reason = check_pong(data)
+ succeeded, reason = check_pong(ri, data)
unless succeeded
@log.warn "connection refused to #{@name}: #{reason}"
disable! # shutdown
return
end
- @state = :established
+ ri.state = :established
@log.debug "connection established", host: @host, port: @port
else
- raise "BUG: unknown session state: #{@state}"
+ raise "BUG: unknown session state: #{ri.state}"
end
end
end
# Override Node to disable heartbeat