require_relative '../helper' require 'fluent/test/driver/output' require 'fluent/plugin/out_forward' require 'flexmock/test_unit' require 'fluent/test/driver/input' require 'fluent/plugin/in_forward' class ForwardOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup FileUtils.rm_rf(TMP_DIR) FileUtils.mkdir_p(TMP_DIR) @d = nil @target_port = unused_port end def teardown @d.instance_shutdown if @d @port = nil end TMP_DIR = File.join(__dir__, "../tmp/out_forward#{ENV['TEST_ENV_NUMBER']}") TARGET_HOST = '127.0.0.1' def config %[ send_timeout 51 heartbeat_type udp name test host #{TARGET_HOST} port #{@target_port} ] end def target_config %[ port #{@target_port} bind #{TARGET_HOST} ] end def create_driver(conf=config) Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) { attr_reader :sent_chunk_ids, :ack_handler, :discovery_manager def initialize super @sent_chunk_ids = [] end def try_write(chunk) retval = super @sent_chunk_ids << chunk.unique_id retval end }.configure(conf) end test 'configure' do @d = d = create_driver(%[ self_hostname localhost name test host #{TARGET_HOST} port #{@target_port} ]) nodes = d.instance.nodes assert_equal 60, d.instance.send_timeout assert_equal :transport, d.instance.heartbeat_type assert_equal 1, nodes.length assert_nil d.instance.connect_timeout node = nodes.first assert_equal "test", node.name assert_equal '127.0.0.1', node.host assert_equal @target_port, node.port end test 'configure_traditional' do @d = d = create_driver(< name test host #{TARGET_HOST} port #{@target_port} buffer_chunk_limit 10m EOL instance = d.instance assert instance.chunk_key_tag assert !instance.chunk_key_time assert_equal [], instance.chunk_keys assert{ instance.buffer.is_a?(Fluent::Plugin::MemoryBuffer) } assert_equal( 10*1024*1024, instance.buffer.chunk_limit_size ) end test 'configure timeouts' do @d = d = create_driver(%[ send_timeout 30 connect_timeout 10 hard_timeout 15 ack_response_timeout 20 host #{TARGET_HOST} port #{@target_port} ]) assert_equal 30, d.instance.send_timeout assert_equal 10, d.instance.connect_timeout assert_equal 15, d.instance.hard_timeout assert_equal 20, d.instance.ack_response_timeout end test 'configure_udp_heartbeat' do @d = d = create_driver(config + "\nheartbeat_type udp") assert_equal :udp, d.instance.heartbeat_type end test 'configure_none_heartbeat' do @d = d = create_driver(config + "\nheartbeat_type none") assert_equal :none, d.instance.heartbeat_type end test 'configure_expire_dns_cache' do @d = d = create_driver(config + "\nexpire_dns_cache 5") assert_equal 5, d.instance.expire_dns_cache end test 'configure_dns_round_robin udp' do assert_raise(Fluent::ConfigError) do create_driver(config + "\nheartbeat_type udp\ndns_round_robin true") end end test 'configure_dns_round_robin transport' do @d = d = create_driver(config + "\nheartbeat_type transport\ndns_round_robin true") assert_equal true, d.instance.dns_round_robin end test 'configure_dns_round_robin none' do @d = d = create_driver(config + "\nheartbeat_type none\ndns_round_robin true") assert_equal true, d.instance.dns_round_robin end test 'configure_no_server' do assert_raise(Fluent::ConfigError, 'forward output plugin requires at least one is required') do create_driver('') end end test 'configure with ignore_network_errors_at_startup' do normal_conf = config_element('match', '**', {}, [ config_element('server', '', {'name' => 'test', 'host' => 'unexisting.yaaaaaaaaaaaaaay.host.example.com'}) ]) assert_raise SocketError do create_driver(normal_conf) end conf = config_element('match', '**', {'ignore_network_errors_at_startup' => 'true'}, [ config_element('server', '', {'name' => 'test', 'host' => 'unexisting.yaaaaaaaaaaaaaay.host.example.com'}) ]) @d = d = create_driver(conf) expected_log = "failed to resolve node name when configured" expected_detail = 'server="test" error_class=SocketError' logs = d.logs assert{ logs.any?{|log| log.include?(expected_log) && log.include?(expected_detail) } } end data('CA cert' => 'tls_ca_cert_path', 'non CA cert' => 'tls_cert_path') test 'configure tls_cert_path/tls_ca_cert_path' do |param| dummy_cert_path = File.join(TMP_DIR, "dummy_cert.pem") FileUtils.touch(dummy_cert_path) conf = %[ send_timeout 5 transport tls tls_insecure_mode true #{param} #{dummy_cert_path} host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(conf) # In the plugin, tls_ca_cert_path is used for both cases assert_equal([dummy_cert_path], d.instance.tls_ca_cert_path) end sub_test_case "certstore loading parameters for Windows" do test 'certstore related config parameters' do omit "certstore related values raise error on not Windows" if Fluent.windows? conf = %[ send_timeout 5 transport tls tls_cert_logical_store_name Root tls_cert_thumbprint a909502dd82ae41433e6f83886b00d4277a32a7b host #{TARGET_HOST} port #{@target_port} ] assert_raise(Fluent::ConfigError) do create_driver(conf) end end test 'cert_logical_store_name and tls_cert_thumbprint default values' do conf = %[ send_timeout 5 transport tls host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(conf) assert_nil d.instance.tls_cert_logical_store_name assert_nil d.instance.tls_cert_thumbprint end data('CA cert' => 'tls_ca_cert_path', 'non CA cert' => 'tls_cert_path') test 'specify tls_cert_logical_store_name and tls_cert_path should raise error' do |param| omit "Loading CertStore feature works only Windows" unless Fluent.windows? dummy_cert_path = File.join(TMP_DIR, "dummy_cert.pem") FileUtils.touch(dummy_cert_path) conf = %[ send_timeout 5 transport tls #{param} #{dummy_cert_path} tls_cert_logical_store_name Root host #{TARGET_HOST} port #{@target_port} ] assert_raise(Fluent::ConfigError) do create_driver(conf) end end test 'configure cert_logical_store_name and tls_cert_thumbprint' do omit "Loading CertStore feature works only Windows" unless Fluent.windows? conf = %[ send_timeout 5 transport tls tls_cert_logical_store_name Root tls_cert_thumbprint a909502dd82ae41433e6f83886b00d4277a32a7b host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(conf) assert_equal "Root", d.instance.tls_cert_logical_store_name assert_equal "a909502dd82ae41433e6f83886b00d4277a32a7b", d.instance.tls_cert_thumbprint end end test 'server is an abbreviation of static type of service_discovery' do @d = d = create_driver(%[ host 127.0.0.1 port 1234 @type static host 127.0.0.1 port 1235 ]) assert_equal( [ { host: '127.0.0.1', port: 1234 }, { host: '127.0.0.1', port: 1235 }, ], d.instance.discovery_manager.services.collect do |service| { host: service.host, port: service.port } end ) end test 'pass username and password as empty string to HandshakeProtocol' do config_path = File.join(TMP_DIR, "sd_file.conf") File.open(config_path, 'w') do |file| file.write(%[ - 'host': 127.0.0.1 'port': 1234 'weight': 1 ]) end mock(Fluent::Plugin::ForwardOutput::HandshakeProtocol).new(log: anything, hostname: nil, shared_key: anything, password: '', username: '') @d = d = create_driver(%[ @type file path #{config_path} ]) assert_equal 1, d.instance.discovery_manager.services.size assert_equal '127.0.0.1', d.instance.discovery_manager.services[0].host assert_equal 1234, d.instance.discovery_manager.services[0].port end test 'compress_default_value' do @d = d = create_driver assert_equal :text, d.instance.compress node = d.instance.nodes.first assert_equal :text, node.instance_variable_get(:@compress) end test 'set_compress_is_gzip' do @d = d = create_driver(config + %[compress gzip]) assert_equal :gzip, d.instance.compress assert_equal :gzip, d.instance.buffer.compress node = d.instance.nodes.first assert_equal :gzip, node.instance_variable_get(:@compress) end test 'set_compress_is_gzip_in_buffer_section' do mock = flexmock($log) mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ") @d = d = create_driver(config + %[ type memory compress gzip ]) assert_equal :text, d.instance.compress assert_equal :gzip, d.instance.buffer.compress node = d.instance.nodes.first assert_equal :text, node.instance_variable_get(:@compress) end test 'phi_failure_detector disabled' do @d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0]) node = d.instance.nodes.first stub(node.failure).phi { raise 'Should not be called' } node.tick assert_true node.available? end test 'phi_failure_detector enabled' do @d = d = create_driver(config + %[phi_failure_detector true \n phi_threshold 0]) node = d.instance.nodes.first node.tick assert_false node.available? end test 'require_ack_response is disabled in default' do @d = d = create_driver(config) assert_equal false, d.instance.require_ack_response assert_equal 190, d.instance.ack_response_timeout end test 'require_ack_response can be enabled' do @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 2s ]) d.instance_start assert d.instance.require_ack_response assert_equal 2, d.instance.ack_response_timeout end test 'suspend_flush is disable before before_shutdown' do @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 2s ]) d.instance_start assert_false d.instance.instance_variable_get(:@suspend_flush) end test 'suspend_flush should be enabled and try_flush returns nil after before_shutdown' do @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 2s ]) d.instance_start d.instance.before_shutdown assert_true d.instance.instance_variable_get(:@suspend_flush) assert_nil d.instance.try_flush end test 'verify_connection_at_startup is disabled in default' do @d = d = create_driver(config) assert_false d.instance.verify_connection_at_startup end test 'verify_connection_at_startup can be enabled' do @d = d = create_driver(config + %[ verify_connection_at_startup true ]) assert_true d.instance.verify_connection_at_startup end test 'send tags in str (utf-8 strings)' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") tag_in_utf8 = "test.utf8".encode("utf-8") tag_in_ascii = "test.ascii".encode("ascii-8bit") emit_events = [ [tag_in_utf8, time, {"a" => 1}], [tag_in_ascii, time, {"a" => 2}], ] stub(d.instance.ack_handler).read_ack_from_sock(anything).never assert_rr do target_input_driver.run(expect_records: 2) do d.run do emit_events.each do |tag, t, record| d.feed(tag, t, record) end end end end events = target_input_driver.events assert_equal_event_time(time, events[0][1]) assert_equal ['test.utf8', time, emit_events[0][2]], events[0] assert_equal Encoding::UTF_8, events[0][0].encoding assert_equal_event_time(time, events[1][1]) assert_equal ['test.ascii', time, emit_events[1][2]], events[1] assert_equal Encoding::UTF_8, events[1][0].encoding end test 'send_with_time_as_integer' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] stub(d.instance.ack_handler).read_ack_from_sock(anything).never assert_rr do target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end end events = target_input_driver.events assert_equal_event_time(time, events[0][1]) assert_equal ['test', time, records[0]], events[0] assert_equal_event_time(time, events[1][1]) assert_equal ['test', time, records[1]], events[1] end test 'send_without_time_as_integer' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[ flush_interval 1s time_as_integer false ]) time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] stub(d.instance.ack_handler).read_ack_from_sock(anything).never assert_rr do target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end end events = target_input_driver.events assert_equal_event_time(time, events[0][1]) assert_equal ['test', time, records[0]], events[0] assert_equal_event_time(time, events[1][1]) assert_equal ['test', time, records[1]], events[1] end test 'send_comprssed_message_pack_stream_if_compress_is_gzip' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[ flush_interval 1s compress gzip ]) time = event_time('2011-01-02 13:14:15 UTC') records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end event_streams = target_input_driver.event_streams assert_true event_streams[0][1].is_a?(Fluent::CompressedMessagePackEventStream) events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] end test 'send_to_a_node_supporting_responses' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] # not attempt to receive responses stub(d.instance.ack_handler).read_ack_from_sock(anything).never assert_rr do target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end end events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] end test 'send_to_a_node_not_supporting_responses' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[flush_interval 1s]) time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] # not attempt to receive responses stub(d.instance.ack_handler).read_ack_from_sock(anything).never assert_rr do target_input_driver.run(expect_records: 2) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end end events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] end test 'a node supporting responses' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 1s flush_mode immediate retry_type periodic retry_wait 30s flush_at_shutdown true ]) time = event_time("2011-01-02 13:14:15 UTC") acked_chunk_ids = [] nacked = false mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success| if success acked_chunk_ids << info.chunk_id else nacked = true end [info, success] end records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.run(expect_records: 2, timeout: 5) do d.end_if { acked_chunk_ids.size > 0 || nacked } d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do d.feed([[time, records[0]], [time,records[1]]]) end end assert(!nacked, d.instance.log.logs.join) events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] assert_equal 1, acked_chunk_ids.size assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first end test 'a node supporting responses after stop' do target_input_driver = create_target_input_driver @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 10s flush_mode immediate retry_type periodic retry_wait 30s flush_at_shutdown true ]) time = event_time("2011-01-02 13:14:15 UTC") acked_chunk_ids = [] nacked = false mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success| if success acked_chunk_ids << info.chunk_id else nacked = true end [info, success] end records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.run(expect_records: 2, timeout: 5) do d.end_if { acked_chunk_ids.size > 0 || nacked } d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do d.instance.stop d.feed([[time, records[0]], [time,records[1]]]) d.instance.before_shutdown d.instance.shutdown d.instance.after_shutdown end end assert(!nacked, d.instance.log.logs.join) events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] assert_equal 1, acked_chunk_ids.size assert_equal d.instance.sent_chunk_ids.first, acked_chunk_ids.first end data('ack true' => true, 'ack false' => false) test 'TLS transport and ack parameter combination' do |ack| omit "TLS and 'ack false' always fails on AppVeyor. Need to debug" if Fluent.windows? && !ack input_conf = target_config + %[ insecure true ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ send_timeout 5 require_ack_response #{ack} transport tls tls_insecure_mode true host #{TARGET_HOST} port #{@target_port} #flush_mode immediate flush_interval 0s flush_at_shutdown false # suppress errors in d.instance_shutdown ] @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") records = [{"a" => 1}, {"a" => 2}] target_input_driver.run(expect_records: 2, timeout: 3) do d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do records.each do |record| d.feed(time, record) end end end events = target_input_driver.events assert{ events != [] } assert_equal(['test', time, records[0]], events[0]) assert_equal(['test', time, records[1]], events[1]) end test 'a destination node not supporting responses by just ignoring' do target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: false) @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 1s flush_mode immediate retry_type periodic retry_wait 30s flush_at_shutdown false # suppress errors in d.instance_shutdown flush_thread_interval 30s ]) node = d.instance.nodes.first delayed_commit_timeout_value = nil time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.end_if{ d.instance.rollback_count > 0 } target_input_driver.end_if{ !node.available? } target_input_driver.run(expect_records: 2, timeout: 25) do d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do delayed_commit_timeout_value = d.instance.delayed_commit_timeout d.feed([[time, records[0]], [time,records[1]]]) end end assert_equal (1 + 2), delayed_commit_timeout_value events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] assert{ d.instance.rollback_count > 0 } logs = d.instance.log.logs assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } } end test 'a destination node not supporting responses by disconnection' do target_input_driver = create_target_input_driver(response_stub: ->(_option) { nil }, disconnect: true) @d = d = create_driver(config + %[ require_ack_response true ack_response_timeout 1s flush_mode immediate retry_type periodic retry_wait 30s flush_at_shutdown false # suppress errors in d.instance_shutdown flush_thread_interval 30s ]) node = d.instance.nodes.first delayed_commit_timeout_value = nil time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.end_if{ d.instance.rollback_count > 0 } target_input_driver.end_if{ !node.available? } target_input_driver.run(expect_records: 2, timeout: 25) do d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do delayed_commit_timeout_value = d.instance.delayed_commit_timeout d.feed([[time, records[0]], [time,records[1]]]) end end assert_equal (1 + 2), delayed_commit_timeout_value events = target_input_driver.events assert_equal ['test', time, records[0]], events[0] assert_equal ['test', time, records[1]], events[1] assert{ d.instance.rollback_count > 0 } logs = d.instance.log.logs assert{ logs.any?{|log| log.include?("no response from node. regard it as unavailable.") } } end test 'authentication_with_shared_key' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey host 127.0.0.1 ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ send_timeout 51 self_hostname localhost shared_key fluentd-sharedkey name test host #{TARGET_HOST} port #{@target_port} shared_key fluentd-sharedkey ] @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.run(expect_records: 2, timeout: 15) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end events = target_input_driver.events assert{ events != [] } assert_equal(['test', time, records[0]], events[0]) assert_equal(['test', time, records[1]], events[1]) end test 'keepalive + shared_key' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ send_timeout 51 keepalive true self_hostname localhost shared_key fluentd-sharedkey name test host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(output_conf) time = event_time('2011-01-02 13:14:15 UTC') records = [{ 'a' => 1 }, { 'a' => 2 }] records2 = [{ 'b' => 1}, { 'b' => 2}] target_input_driver.run(expect_records: 4, timeout: 15) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end d.flush # emit buffer to reuse same socket later records2.each do |record| d.feed(time, record) end end end events = target_input_driver.events assert{ events != [] } assert_equal(['test', time, records[0]], events[0]) assert_equal(['test', time, records[1]], events[1]) assert_equal(['test', time, records2[0]], events[2]) assert_equal(['test', time, records2[1]], events[3]) end test 'authentication_with_user_auth' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey user_auth true username fluentd password fluentd host 127.0.0.1 ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ send_timeout 51 self_hostname localhost shared_key fluentd-sharedkey name test host #{TARGET_HOST} port #{@target_port} shared_key fluentd-sharedkey username fluentd password fluentd ] @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] target_input_driver.run(expect_records: 2, timeout: 15) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end events = target_input_driver.events assert{ events != [] } assert_equal(['test', time, records[0]], events[0]) assert_equal(['test', time, records[1]], events[1]) end # This test is not 100% but test failed with previous Node implementation which has race condition test 'Node with security is thread-safe on multi threads' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey host 127.0.0.1 ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ send_timeout 51 self_hostname localhost shared_key fluentd-sharedkey name test host #{TARGET_HOST} port #{@target_port} shared_key fluentd-sharedkey ] @d = d = create_driver(output_conf) chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) target_input_driver.run(timeout: 15) do d.run(shutdown: false) do node = d.instance.nodes.first arr = [] 4.times { arr << Thread.new { node.send_data('test', chunk) rescue nil } } arr.each { |a| a.join } end end logs = d.logs assert_false(logs.any? { |log| log.include?("invalid format for PONG message") || log.include?("shared key mismatch") }, "Actual log:\n#{logs.join}") end def create_target_input_driver(response_stub: nil, disconnect: false, conf: target_config) require 'fluent/plugin/in_forward' # TODO: Support actual TCP heartbeat test Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput) { if response_stub.nil? # do nothing because in_forward responds for ack option in default else define_method(:response) do |options| return response_stub.(options) end end }.configure(conf) end test 'heartbeat_type_none' do @d = d = create_driver(config + "\nheartbeat_type none") node = d.instance.nodes.first assert_equal Fluent::Plugin::ForwardOutput::NoneHeartbeatNode, node.class d.instance_start assert_nil d.instance.instance_variable_get(:@loop) # no HeartbeatHandler, or HeartbeatRequestTimer assert_nil d.instance.instance_variable_get(:@thread) # no HeartbeatHandler, or HeartbeatRequestTimer stub(node.failure).phi { raise 'Should not be called' } node.tick assert_true node.available? end test 'heartbeat_type_udp' do @d = d = create_driver(config + "\nheartbeat_type udp") d.instance_start usock = d.instance.instance_variable_get(:@usock) servers = d.instance.instance_variable_get(:@_servers) timers = d.instance.instance_variable_get(:@_timers) assert_equal Fluent::PluginHelper::Socket::WrappedSocket::UDP, usock.class assert_kind_of UDPSocket, usock assert servers.find{|s| s.title == :out_forward_heartbeat_receiver } assert timers.include?(:out_forward_heartbeat_request) mock(usock).send("\0", 0, Socket.pack_sockaddr_in(@target_port, '127.0.0.1')).once d.instance.send(:on_heartbeat_timer) end test 'acts_as_secondary' do i = Fluent::Plugin::ForwardOutput.new conf = config_element( 'match', 'primary.**', {'@type' => 'forward'}, [ config_element('server', '', {'host' => '127.0.0.1'}), config_element('secondary', '', {}, [ config_element('server', '', {'host' => '192.168.1.2'}), config_element('server', '', {'host' => '192.168.1.3'}) ]), ] ) assert_nothing_raised do i.configure(conf) end end test 'when out_forward has @id' do # cancel https://github.com/fluent/fluentd/blob/077508ac817b7637307434d0c978d7cdc3d1c534/lib/fluent/plugin_id.rb#L43-L53 # it always return true in test mock.proxy(Fluent::Plugin).new_sd('static', parent: anything) { |v| stub(v).plugin_id_for_test? { false } }.once output = Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) { def plugin_id_for_test? false end } assert_nothing_raised do output.configure(config + %[ @id unique_out_forward ]) end end sub_test_case 'verify_connection_at_startup' do test 'nodes are not available' do @d = d = create_driver(config + %[ verify_connection_at_startup true ]) e = assert_raise Fluent::UnrecoverableError do d.instance_start end if Fluent.windows? assert_match(/No connection could be made because the target machine actively refused it/, e.message) else assert_match(/Connection refused/, e.message) end d.instance_shutdown end test 'nodes_shared_key_miss_match' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ transport tcp verify_connection_at_startup true self_hostname localhost shared_key key_miss_match host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(output_conf) target_input_driver.run(expect_records: 1, timeout: 1) do e = assert_raise Fluent::UnrecoverableError do d.instance_start end assert_match(/failed to establish connection/, e.message) end end test 'nodes_shared_key_miss_match with TLS' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey insecure true ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ transport tls tls_insecure_mode true verify_connection_at_startup true self_hostname localhost shared_key key_miss_match host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(output_conf) target_input_driver.run(expect_records: 1, timeout: 1) do e = assert_raise Fluent::UnrecoverableError do d.instance_start end assert_match(/failed to establish connection/, e.message) end end test 'nodes_shared_key_match' do input_conf = target_config + %[ self_hostname in.localhost shared_key fluentd-sharedkey ] target_input_driver = create_target_input_driver(conf: input_conf) output_conf = %[ verify_connection_at_startup true self_hostname localhost shared_key fluentd-sharedkey name test host #{TARGET_HOST} port #{@target_port} ] @d = d = create_driver(output_conf) time = event_time("2011-01-02 13:14:15 UTC") records = [{ "a" => 1 }, { "a" => 2 }] target_input_driver.run(expect_records: 2, timeout: 3) do d.run(default_tag: 'test') do records.each do |record| d.feed(time, record) end end end events = target_input_driver.events assert_false events.empty? assert_equal(['test', time, records[0]], events[0]) assert_equal(['test', time, records[1]], events[1]) end end test 'Create new connection per send_data' do target_input_driver = create_target_input_driver(conf: target_config) output_conf = config d = create_driver(output_conf) d.instance_start begin chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port, linger_timeout: anything, send_timeout: anything, recv_timeout: anything, connect_timeout: anything ) { |sock| mock(sock).close.once; sock }.twice target_input_driver.run(timeout: 15) do d.run(shutdown: false) do node = d.instance.nodes.first 2.times do node.send_data('test', chunk) rescue nil end end end ensure d.instance_shutdown end end test 'if no available node' do # do not create output driver d = create_driver(%[ name test standby host #{TARGET_HOST} port #{@target_port} ]) d.instance_start assert_nothing_raised { d.run } end sub_test_case 'keepalive' do test 'Do not create connection per send_data' do target_input_driver = create_target_input_driver(conf: target_config) output_conf = config + %[ keepalive true keepalive_timeout 2 ] d = create_driver(output_conf) d.instance_start begin chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port, linger_timeout: anything, send_timeout: anything, recv_timeout: anything, connect_timeout: anything ) { |sock| mock(sock).close.once; sock }.once target_input_driver.run(timeout: 15) do d.run(shutdown: false) do node = d.instance.nodes.first 2.times do node.send_data('test', chunk) rescue nil end end end ensure d.instance_shutdown end end test 'create timer of purging obsolete sockets' do output_conf = config + %[keepalive true] d = create_driver(output_conf) mock(d.instance).timer_execute(:out_forward_heartbeat_request, 1).once mock(d.instance).timer_execute(:out_forward_keep_alived_socket_watcher, 5).once d.instance_start end sub_test_case 'with require_ack_response' do test 'Create connection per send_data' do target_input_driver = create_target_input_driver(conf: target_config) output_conf = config + %[ require_ack_response true keepalive true keepalive_timeout 2 ] d = create_driver(output_conf) d.instance_start chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil)) mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port, linger_timeout: anything, send_timeout: anything, recv_timeout: anything, connect_timeout: anything) { |sock| mock(sock).close.once; sock }.twice target_input_driver.run(timeout: 15) do d.run do node = d.instance.nodes.first 2.times do node.send_data('test', chunk) rescue nil end end end end end end end