require_relative '../helper' require 'fluent/test/driver/input' require 'fluent/test/startup_shutdown' require 'base64' require 'fluent/env' require 'fluent/event' require 'fluent/plugin/in_forward' require 'fluent/plugin/compressable' require 'timecop' class ForwardInputTest < Test::Unit::TestCase include Fluent::Plugin::Compressable def setup Fluent::Test.setup @responses = [] # for testing responses after sending data @d = nil end def teardown @d.instance_shutdown if @d end PORT = unused_port SHARED_KEY = 'foobar1' USER_NAME = 'tagomoris' USER_PASSWORD = 'fluentd' CONFIG = %[ port #{PORT} bind 127.0.0.1 ] LOCALHOST_HOSTNAME_GETTER = ->(){sock = UDPSocket.new(::Socket::AF_INET); sock.do_not_reverse_lookup = false; sock.connect("127.0.0.1", 2048); sock.peeraddr[2] } LOCALHOST_HOSTNAME = LOCALHOST_HOSTNAME_GETTER.call DUMMY_SOCK = Struct.new(:remote_host, :remote_addr, :remote_port).new(LOCALHOST_HOSTNAME, "127.0.0.1", 0) CONFIG_AUTH = %[ port #{PORT} bind 127.0.0.1 self_hostname localhost shared_key foobar1 user_auth true username #{USER_NAME} password #{USER_PASSWORD} network 127.0.0.0/8 shared_key #{SHARED_KEY} users ["#{USER_NAME}"] ] def create_driver(conf=CONFIG) Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput).configure(conf) end sub_test_case '#configure' do test 'simple' do @d = d = create_driver assert_equal PORT, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 0, d.instance.linger_timeout assert_equal 0.5, d.instance.blocking_timeout assert !d.instance.backlog end test 'auth' do @d = d = create_driver(CONFIG_AUTH) assert_equal PORT, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 0, d.instance.linger_timeout assert !d.instance.backlog assert d.instance.security assert_equal 1, d.instance.security.users.size assert_equal 1, d.instance.security.clients.size end end sub_test_case 'message' do test 'time' do time = event_time("2011-01-02 13:14:15 UTC") begin Timecop.freeze(Time.at(time)) @d = d = create_driver records = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 5) do records.each {|tag, _time, record| send_data packer.write([tag, 0, record]).to_s } end assert_equal(records, d.events.sort_by {|a| a[0] }) ensure Timecop.return end end test 'plain' do @d = d = create_driver time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 5) do records.each {|tag, _time, record| send_data packer.write([tag, _time, record]).to_s } end assert_equal(records, d.events) end test 'time_as_integer' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ ["tag1", time_i, {"a"=>1}], ["tag2", time_i, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 5) do records.each {|tag, _time, record| send_data packer.write([tag, _time, record]).to_s } end assert_equal(records, d.events) end test 'skip_invalid_event' do @d = d = create_driver(CONFIG + "skip_invalid_event true") time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a" => 1}], ["tag2", time, {"a" => 2}], ] d.run(shutdown: false, expect_records: 2, timeout: 10) do entries = [] # These entries are skipped entries << ['tag1', true, {'a' => 3}] << ['tag2', time, 'invalid record'] entries += records.map { |tag, _time, record| [tag, _time, record] } entries.each {|tag, _time, record| # Without ack, logs are sometimes not saved to logs during test. send_data packer.write([tag, _time, record]).to_s #, try_to_receive_response: true } end logs = d.instance.log.logs assert_equal 2, logs.count { |line| line =~ /got invalid event and drop it/ } assert_equal records[0], d.events[0] assert_equal records[1], d.events[1] d.instance_shutdown end test 'json_using_integer_time' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ ["tag1", time_i, {"a"=>1}], ["tag2", time_i, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json } end assert_equal(records, d.events.sort_by {|a| a[1] }) end test 'json_with_newline' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ ["tag1", time_i, {"a"=>1}], ["tag2", time_i, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json + "\n" sleep 1 } end assert_equal(records, d.events.sort_by {|a| a[1] }) end end sub_test_case 'forward' do data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'plain' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] d.run(expect_records: records.length, timeout: 20) do entries = [] records.each {|tag, _time, record| entries << [_time, record] } send_data packer.write(["tag1", entries]).to_s, **options end assert_equal(records, d.events) end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'time_as_integer' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time_i = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time_i, {"a"=>1}], ["tag1", time_i, {"a"=>2}] ] d.run(expect_records: records.length, timeout: 20) do entries = [] records.each {|tag, _time, record| entries << [_time, record] } send_data packer.write(["tag1", entries]).to_s, **options end assert_equal(records, d.events) end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'skip_invalid_event' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config + "skip_invalid_event true") time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a" => 1}], ["tag1", time, {"a" => 2}], ] d.run(shutdown: false, expect_records: records.length, timeout: 20) do entries = records.map { |tag, _time, record| [_time, record] } # These entries are skipped entries << ['invalid time', {'a' => 3}] << [time, 'invalid record'] send_data packer.write(["tag1", entries]).to_s, **options end logs = d.instance.log.out.logs assert{ logs.select{|line| line =~ /skip invalid event/ }.size == 2 } d.instance_shutdown end end sub_test_case 'packed forward' do data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'plain' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 20) do entries = '' records.each {|_tag, _time, record| packer(entries).write([_time, record]).flush } send_data packer.write(["tag1", entries]).to_s, **options end assert_equal(records, d.events) end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'time_as_integer' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ ["tag1", time_i, {"a"=>1}], ["tag1", time_i, {"a"=>2}], ] d.run(expect_records: records.length, timeout: 20) do entries = '' records.each {|tag, _time, record| packer(entries).write([_time, record]).flush } send_data packer.write(["tag1", entries]).to_s, **options end assert_equal(records, d.events) end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'skip_invalid_event' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config + "skip_invalid_event true") time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a" => 1}], ["tag1", time, {"a" => 2}], ] d.run(shutdown: false, expect_records: records.length, timeout: 20) do entries = records.map { |tag, _time, record| [_time, record] } # These entries are skipped entries << ['invalid time', {'a' => 3}] << [time, 'invalid record'] packed_entries = '' entries.each { |_time, record| packer(packed_entries).write([_time, record]).flush } send_data packer.write(["tag1", packed_entries]).to_s, **options end logs = d.instance.log.logs assert_equal 2, logs.count { |line| line =~ /skip invalid event/ } d.instance_shutdown end end sub_test_case 'compressed packed forward' do test 'set_compress_to_option' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i events = [ ["tag1", time_i, {"a"=>1}], ["tag1", time_i, {"a"=>2}] ] # create compressed entries entries = '' events.each do |_tag, _time, record| v = [_time, record].to_msgpack entries << compress(v) end chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack d.run do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) assert_equal 'gzip', option['compressed'] end end assert_equal events, d.events end test 'create_CompressedMessagePackEventStream_with_gzip_compress_option' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i events = [ ["tag1", time_i, {"a"=>1}], ["tag1", time_i, {"a"=>2}] ] # create compressed entries entries = '' events.each do |_tag, _time, record| v = [_time, record].to_msgpack entries << compress(v) end chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack # check CompressedMessagePackEventStream is created mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0) d.run do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) assert_equal 'gzip', option['compressed'] end end end end sub_test_case 'warning' do test 'send_large_chunk_warning' do @d = d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M chunk_size_limit 32M ]) time = event_time("2014-04-25 13:14:15 UTC") # generate over 16M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack assert chunk.size > (16 * 1024 * 1024) assert chunk.size < (32 * 1024 * 1024) d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) end end # check emitted data emits = d.events assert_equal 16, emits.size assert emits.map(&:first).all?{|t| t == "test.tag" } assert_equal (0...16).to_a, emits.map{|_tag, t, _record| t - time } # check log logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && line =~ / tag="test.tag" host="#{LOCALHOST_HOSTNAME}" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown end test 'send_large_chunk_only_warning' do @d = d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M ]) time = event_time("2014-04-25 13:14:15 UTC") # generate over 16M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) end end # check log logs = d.instance.log.logs assert_equal 1, logs.select{ |line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && line =~ / tag="test.tag" host="#{LOCALHOST_HOSTNAME}" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown end test 'send_large_chunk_limit' do @d = d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M chunk_size_limit 32M ]) time = event_time("2014-04-25 13:14:15 UTC") # generate over 32M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...32).map{|i| [time + i, {"data" => str}] } ].to_msgpack assert chunk.size > (32 * 1024 * 1024) # d.run => send_data d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) end end # check emitted data emits = d.events assert_equal 0, emits.size # check log logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_limit', dropped:/ && line =~ / tag="test.tag" host="#{LOCALHOST_HOSTNAME}" limit=33554432 size=33554989/ }.size, "large chunk warning is not logged" d.instance_shutdown end data('string chunk' => 'broken string', 'integer chunk' => 10) test 'send_broken_chunk' do |data| @d = d = create_driver # d.run => send_data d.run(shutdown: false) do d.instance.send(:on_message, data, 1000000000, DUMMY_SOCK) end # check emitted data assert_equal 0, d.events.size # check log logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: incoming chunk is broken: host="#{LOCALHOST_HOSTNAME}" msg=#{data.inspect}/ }.size, "should not accept broken chunk" d.instance_shutdown end end sub_test_case 'respond to required ack' do data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'message' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}] ] expected_acks = [] d.run(expect_records: events.size) do events.each {|tag, _time, record| op = { 'chunk' => Base64.encode64(record.object_id.to_s) } expected_acks << op['chunk'] send_data [tag, _time, record, op].to_msgpack, try_to_receive_response: true, **options } end assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| MessagePack.unpack(res)['ack'] } end # FIX: response is not pushed into @responses because IO.select has been blocked until InputForward shutdowns data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'forward' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] expected_acks = [] d.run(expect_records: events.size) do entries = [] events.each {|_tag, _time, record| entries << [_time, record] } op = { 'chunk' => Base64.encode64(entries.object_id.to_s) } expected_acks << op['chunk'] send_data ["tag1", entries, op].to_msgpack, try_to_receive_response: true, **options end assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| MessagePack.unpack(res)['ack'] } end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'packed_forward' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] expected_acks = [] d.run(expect_records: events.size) do entries = '' events.each {|_tag, _time,record| [time, record].to_msgpack(entries) } op = { 'chunk' => Base64.encode64(entries.object_id.to_s) } expected_acks << op['chunk'] send_data ["tag1", entries, op].to_msgpack, try_to_receive_response: true, **options end assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| MessagePack.unpack(res)['ack'] } end data( tcp: { config: CONFIG, options: { auth: false } }, ### Auth is not supported with json # auth: { # config: CONFIG_AUTH, # options: { # auth: true # } # }, ) test 'message_json' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time_i = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time_i, {"a"=>1}], ["tag2", time_i, {"a"=>2}] ] expected_acks = [] d.run(expect_records: events.size, timeout: 20) do events.each {|tag, _time, record| op = { 'chunk' => Base64.encode64(record.object_id.to_s) } expected_acks << op['chunk'] send_data [tag, _time, record, op].to_json, try_to_receive_response: true, **options } end assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| JSON.parse(res)['ack'] } end end sub_test_case 'not respond without required ack' do data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'message' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}] ] d.run(expect_records: events.size, timeout: 20) do events.each {|tag, _time, record| send_data [tag, _time, record].to_msgpack, try_to_receive_response: true, **options } end assert_equal events, d.events assert_equal [nil, nil], @responses end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'forward' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] d.run(expect_records: events.size, timeout: 20) do entries = [] events.each {|tag, _time, record| entries << [_time, record] } send_data ["tag1", entries].to_msgpack, try_to_receive_response: true, **options end assert_equal events, d.events assert_equal [nil], @responses end data(tcp: { config: CONFIG, options: { auth: false } }, auth: { config: CONFIG_AUTH, options: { auth: true } }) test 'packed_forward' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] d.run(expect_records: events.size, timeout: 20) do entries = '' events.each {|tag, _time, record| [_time, record].to_msgpack(entries) } send_data ["tag1", entries].to_msgpack, try_to_receive_response: true, **options end assert_equal events, d.events assert_equal [nil], @responses end data( tcp: { config: CONFIG, options: { auth: false } }, ### Auth is not supported with json # auth: { # config: CONFIG_AUTH, # options: { # auth: true # } # }, ) test 'message_json' do |data| config = data[:config] options = data[:options] @d = d = create_driver(config) time_i = event_time("2011-01-02 13:14:15 UTC").to_i events = [ ["tag1", time_i, {"a"=>1}], ["tag2", time_i, {"a"=>2}] ] d.run(expect_records: events.size, timeout: 20) do events.each {|tag, _time, record| send_data [tag, _time, record].to_json, try_to_receive_response: true, **options } end assert_equal events, d.events assert_equal [nil, nil], @responses end end def packer(*args) Fluent::Engine.msgpack_factory.packer(*args) end def unpacker Fluent::Engine.msgpack_factory.unpacker end # res # '' : socket is disconnected without any data # nil: socket read timeout def read_data(io, timeout, &block) res = '' select_timeout = 2 clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC timeout_at = Process.clock_gettime(clock_id) + timeout begin buf = '' io_activated = false while Process.clock_gettime(clock_id) < timeout_at if IO.select([io], nil, nil, select_timeout) io_activated = true buf = io.readpartial(2048) res ||= '' res << buf break if block.call(res) end end res = nil unless io_activated # timeout without no data arrival rescue Errno::EAGAIN sleep 0.01 retry if res == '' # if res is not empty, all data in socket buffer are read, so do not retry rescue IOError, EOFError, Errno::ECONNRESET # socket disconnected end res end def simulate_auth_sequence(io, shared_key=SHARED_KEY, username=USER_NAME, password=USER_PASSWORD) auth_response_timeout = 30 shared_key_salt = 'salt' # reading helo helo_data = read_data(io, auth_response_timeout){|data| MessagePack.unpack(data) rescue nil } raise "Authentication packet timeout" unless helo_data raise "Authentication connection closed" if helo_data == '' # ['HELO', options(hash)] helo = MessagePack.unpack(helo_data) raise "Invalid HELO header" unless helo[0] == 'HELO' raise "Invalid HELO option object" unless helo[1].is_a?(Hash) @options = helo[1] # sending ping ping = [ 'PING', 'selfhostname', shared_key_salt, Digest::SHA512.new .update(shared_key_salt) .update('selfhostname') .update(@options['nonce']) .update(shared_key).hexdigest, ] if @options['auth'] # auth enabled -> value is auth salt pass_digest = Digest::SHA512.new.update(@options['auth']).update(username).update(password).hexdigest ping.push(username, pass_digest) else ping.push('', '') end io.write ping.to_msgpack io.flush # reading pong pong_data = read_data(io, auth_response_timeout){|data| MessagePack.unpack(data) rescue nil } raise "PONG packet timeout" unless pong_data raise "PONG connection closed" if pong_data == '' # ['PING', bool(auth_result), string(reason_if_failed), self_hostname, shared_key_digest] pong = MessagePack.unpack(pong_data) raise "Invalid PONG header" unless pong[0] == 'PONG' raise "Authentication Failure: #{pong[2]}" unless pong[1] clientside_calculated = Digest::SHA512.new .update(shared_key_salt) .update(pong[3]) .update(@options['nonce']) .update(shared_key).hexdigest raise "Shared key digest mismatch" unless clientside_calculated == pong[4] # authentication success true end def connect TCPSocket.new('127.0.0.1', PORT) end # Data ordering is not assured: # Records in different sockets are processed on different thread, so its scheduling make effect # on order of emitted records. # So, we MUST sort emitted records in different `send_data` before assertion. def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: false) io = connect if auth simulate_auth_sequence(io) end io.write data io.flush if try_to_receive_response @responses << read_data(io, response_timeout){|d| MessagePack.unpack(d) rescue nil } end ensure io.close rescue nil # SSL socket requires any writes to close sockets end sub_test_case 'source_hostname_key and source_address_key features' do data( both: [:hostname, :address], hostname: [:hostname], address: [:address], ) test 'message protocol' do |keys| execute_test_with_source_hostname_key(*keys) { |events| events.each { |tag, time, record| send_data [tag, time, record].to_msgpack } } end data( both: [:hostname, :address], hostname: [:hostname], address: [:address], ) test 'forward protocol' do |keys| execute_test_with_source_hostname_key(*keys) { |events| entries = [] events.each {|tag,time,record| entries << [time, record] } send_data ['tag1', entries].to_msgpack } end data( both: [:hostname, :address], hostname: [:hostname], address: [:address], ) test 'packed forward protocol' do |keys| execute_test_with_source_hostname_key(*keys) { |events| entries = '' events.each { |tag, time, record| Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush } send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s } end end def execute_test_with_source_hostname_key(*keys, &block) conf = CONFIG.dup if keys.include?(:hostname) conf << <1}], ["tag1", time, {"a"=>2}] ] d.run(expect_records: events.size) do block.call(events) end d.events.each { |tag, _time, record| if keys.include?(:hostname) assert_true record.has_key?('source_hostname') assert_equal DUMMY_SOCK.remote_host, record['source_hostname'] end if keys.include?(:address) assert_true record.has_key?('source_address') assert_equal DUMMY_SOCK.remote_addr, record['source_address'] end } end # TODO heartbeat end