require_relative '../helper'
require 'fluent/test'
class ForwardOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
TARGET_HOST = '127.0.0.1'
TARGET_PORT = 13999
CONFIG = %[
send_timeout 51
name test
host #{TARGET_HOST}
port #{TARGET_PORT}
]
TARGET_CONFIG = %[
port #{TARGET_PORT}
bind #{TARGET_HOST}
]
def create_driver(conf=CONFIG)
Fluent::Test::OutputTestDriver.new(Fluent::ForwardOutput) {
attr_reader :responses, :exceptions
def initialize
super
@responses = []
@exceptions = []
end
def send_data(node, tag, chunk)
# Original #send_data returns nil when it does not wait for responses or when on response timeout.
@responses << super(node, tag, chunk)
rescue => e
@exceptions << e
raise e
end
}.configure(conf)
end
def test_configure
d = create_driver
nodes = d.instance.nodes
assert_equal 51, d.instance.send_timeout
assert_equal :udp, d.instance.heartbeat_type
assert_equal 1, nodes.length
node = nodes.first
assert_equal "test", node.name
assert_equal '127.0.0.1', node.host
assert_equal 13999, node.port
end
def test_configure_tcp_heartbeat
d = create_driver(CONFIG + "\nheartbeat_type tcp")
assert_equal :tcp, d.instance.heartbeat_type
end
def test_configure_none_heartbeat
d = create_driver(CONFIG + "\nheartbeat_type none")
assert_equal :none, d.instance.heartbeat_type
end
def test_configure_dns_round_robin
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG + "\nheartbeat_type udp\ndns_round_robin true")
end
d = create_driver(CONFIG + "\nheartbeat_type tcp\ndns_round_robin true")
assert_equal true, d.instance.dns_round_robin
assert_equal true, d.instance.nodes.first.conf.dns_round_robin
d = create_driver(CONFIG + "\nheartbeat_type none\ndns_round_robin true")
assert_equal true, d.instance.dns_round_robin
assert_equal true, d.instance.nodes.first.conf.dns_round_robin
end
def test_configure_no_server
assert_raise(Fluent::ConfigError, 'forward output plugin requires at least one is required') do
create_driver('')
end
end
def test_phi_failure_detector
d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0])
node = d.instance.nodes.first
stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
d = create_driver(CONFIG + %[phi_failure_detector true \n phi_threshold 0])
node = d.instance.nodes.first
node.tick
assert_equal node.available, false
end
def test_wait_response_timeout_config
d = create_driver(CONFIG)
assert_equal false, d.instance.extend_internal_protocol
assert_equal false, d.instance.require_ack_response
assert_equal 190, d.instance.ack_response_timeout
d = create_driver(CONFIG + %[
require_ack_response true
ack_response_timeout 2s
])
assert d.instance.extend_internal_protocol
assert d.instance.require_ack_response
assert_equal 2, d.instance.ack_response_timeout
end
def test_send_to_a_node_supporting_responses
target_input_driver = create_target_input_driver(true)
d = create_driver(CONFIG + %[flush_interval 1s])
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]
assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned
assert_empty d.instance.exceptions
end
def test_send_to_a_node_not_supporting_responses
target_input_driver = create_target_input_driver
d = create_driver(CONFIG + %[flush_interval 1s])
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]
assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned
assert_empty d.instance.exceptions
end
def test_require_a_node_supporting_responses_to_respond_with_ack
target_input_driver = create_target_input_driver(true)
d = create_driver(CONFIG + %[
flush_interval 1s
require_ack_response true
ack_response_timeout 1s
])
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]
assert_equal 1, d.instance.responses.length
assert d.instance.responses[0].has_key?('ack')
assert_empty d.instance.exceptions
end
def test_require_a_node_not_supporting_responses_to_respond_with_ack
# in_forward, that doesn't support ack feature, and keep connection alive
target_input_driver = create_target_input_driver
d = create_driver(CONFIG + %[
flush_interval 1s
require_ack_response true
ack_response_timeout 1s
])
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end
d.run_timeout = 2
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]
node = d.instance.nodes.first
assert_equal false, node.available # node is regarded as unavailable when timeout
assert_empty d.instance.responses # send_data() raises exception, so response is missing
assert_equal 1, d.instance.exceptions.size
end
# bdf1f4f104c00a791aa94dc20087fe2011e1fd83
def test_require_a_node_not_supporting_responses_2_to_respond_with_ack
# in_forward, that doesn't support ack feature, and disconnect immediately
target_input_driver = create_target_input_driver(false, true)
d = create_driver(CONFIG + %[
flush_interval 1s
require_ack_response true
ack_response_timeout 5s
])
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end
d.run_timeout = 2
target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end
emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]
assert_equal 0, d.instance.responses.size
assert_equal 1, d.instance.exceptions.size # send_data() fails and to be retried
node = d.instance.nodes.first
assert_equal false, node.available # node is regarded as unavailable when unexpected EOF
end
def create_target_input_driver(do_respond=false, disconnect=false, conf=TARGET_CONFIG)
require 'fluent/plugin/in_forward'
DummyEngineDriver.new(Fluent::ForwardInput) {
handler_class = Class.new(Fluent::ForwardInput::Handler) { |klass|
attr_reader :chunk_counter # for checking if received data is successfully deserialized
def initialize(sock, log, on_message)
@sock = sock
@log = log
@chunk_counter = 0
@on_message = on_message
end
if do_respond
def write(data)
@sock.write data
rescue => e
@sock.close
end
else
def write(data)
# do nothing
end
end
def close
@sock.close
end
}
define_method(:start) do
@thread = Thread.new do
Socket.tcp_server_loop(@host, @port) do |sock, client_addrinfo|
begin
handler = handler_class.new(sock, @log, method(:on_message))
loop do
raw_data = sock.recv(1024)
handler.on_read(raw_data)
# chunk_counter is reset to zero only after all the data have been received and successfully deserialized.
break if handler.chunk_counter == 0
end
if disconnect
handler.close
sock = nil
end
sleep # wait for connection to be closed by client
ensure
sock.close if sock
end
end
end
end
def shutdown
@thread.kill
@thread.join
end
}.configure(conf).inject_router()
end
def test_heartbeat_type_none
d = create_driver(CONFIG + "\nheartbeat_type none")
node = d.instance.nodes.first
assert_equal Fluent::ForwardOutput::NoneHeartbeatNode, node.class
d.instance.start
assert_nil d.instance.instance_variable_get(:@loop) # no HeartbeatHandler, or HeartbeatRequestTimer
assert_nil d.instance.instance_variable_get(:@thread) # no HeartbeatHandler, or HeartbeatRequestTimer
stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
end
class DummyEngineDriver < Fluent::Test::TestDriver
def initialize(klass, &block)
super(klass, &block)
@engine = DummyEngineClass.new
@klass = klass
# To avoid accessing Fluent::Engine, set Engine as a plugin's class constant (Fluent::SomePlugin::Engine).
# But this makes it impossible to run tests concurrently by threading in a process.
@klass.const_set(:Engine, @engine)
end
def inject_router
@instance.router = @engine
self
end
def run(&block)
super(&block)
@klass.class_eval do
remove_const(:Engine)
end
end
def emits
all = []
@engine.emit_streams.each {|tag,events|
events.each {|time,record|
all << [tag, time, record]
}
}
all
end
class DummyEngineClass
attr_reader :emit_streams
def initialize
@emit_streams ||= []
end
def emit_stream(tag, es)
@emit_streams << [tag, es.to_a]
end
end
end
end