require 'aws-sdk-s3' require 'aws-sdk-sqs' require 'aws-sdk-sqs/queue_poller' require 'fluent/test' require 'fluent/test/helpers' require 'fluent/test/log' require 'fluent/test/driver/input' require 'fluent/plugin/in_s3' require 'test/unit/rr' require 'zlib' require 'fileutils' include Fluent::Test::Helpers class S3InputTest < Test::Unit::TestCase def setup Fluent::Test.setup @time = event_time("2015-09-30 13:14:15 UTC") Fluent::Engine.now = @time if Fluent.const_defined?(:EventTime) stub(Fluent::EventTime).now { @time } end end CONFIG = %[ aws_key_id test_key_id aws_sec_key test_sec_key s3_bucket test_bucket buffer_type memory queue_name test_queue queue_owner_aws_account_id 123456789123 ] def create_driver(conf = CONFIG) Fluent::Test::Driver::Input.new(Fluent::Plugin::S3Input).configure(conf) end class ConfigTest < self def test_default d = create_driver extractor = d.instance.instance_variable_get(:@extractor) actual = { aws_key_id: d.instance.aws_key_id, aws_sec_key: d.instance.aws_sec_key, s3_bucket: d.instance.s3_bucket, s3_region: d.instance.s3_region, sqs_queue_name: d.instance.sqs.queue_name, extractor_ext: extractor.ext, extractor_content_type: extractor.content_type } expected = { aws_key_id: "test_key_id", aws_sec_key: "test_sec_key", s3_bucket: "test_bucket", s3_region: "us-east-1", sqs_queue_name: "test_queue", extractor_ext: "gz", extractor_content_type: "application/x-gzip" } assert_equal(expected, actual) end def test_empty assert_raise(Fluent::ConfigError) do create_driver("") end end def test_without_sqs_section conf = %[ aws_key_id test_key_id aws_sec_key test_sec_key s3_bucket test_bucket ] assert_raise_message("'' sections are required") do create_driver(conf) end end def test_unknown_store_as config = CONFIG + "\nstore_as unknown" assert_raise(Fluent::ConfigError) do create_driver(config) end end data("json" => ["json", "json", "application/json"], "text" => ["text", "txt", "text/plain"], "gzip" => ["gzip", "gz", "application/x-gzip"], "gzip_command" => ["gzip_command", "gz", "application/x-gzip"], "lzo" => ["lzo", "lzo", "application/x-lzop"], "lzma2" => ["lzma2", "xz", "application/x-xz"]) def test_extractor(data) store_type, ext, content_type = data config = CONFIG + "\nstore_as #{store_type}\n" d = create_driver(config) extractor = d.instance.instance_variable_get(:@extractor) expected = { ext: ext, content_type: content_type } actual = { ext: extractor.ext, content_type: extractor.content_type } assert_equal(expected, actual) rescue Fluent::ConfigError => e pend(e.message) end end def test_s3_endpoint_with_valid_endpoint d = create_driver(CONFIG + 's3_endpoint riak-cs.example.com') assert_equal 'riak-cs.example.com', d.instance.s3_endpoint end data('US West (Oregon)' => 's3-us-west-2.amazonaws.com', 'EU (Frankfurt)' => 's3.eu-central-1.amazonaws.com', 'Asia Pacific (Tokyo)' => 's3-ap-northeast-1.amazonaws.com') def test_s3_endpoint_with_invalid_endpoint(endpoint) assert_raise(Fluent::ConfigError, "s3_endpoint parameter is not supported, use s3_region instead. This parameter is for S3 compatible services") { create_driver(CONFIG + "s3_endpoint #{endpoint}") } end data('US West (Oregon)' => 's3-us-west-2.amazonaws.com', 'EU (Frankfurt)' => 's3.eu-central-1.amazonaws.com', 'Asia Pacific (Tokyo)' => 's3-ap-northeast-1.amazonaws.com') def test_sqs_endpoint_with_invalid_endpoint(endpoint) assert_raise(Fluent::ConfigError, "sqs.endpoint parameter is not supported, use s3_region instead. This parameter is for SQS compatible services") { conf = <<"EOS" aws_key_id test_key_id aws_sec_key test_sec_key s3_bucket test_bucket buffer_type memory queue_name test_queue endpoint #{endpoint} EOS create_driver(conf) } end Struct.new("StubResponse", :queue_url) Struct.new("StubMessage", :message_id, :receipt_handle, :body) def setup_mocks @s3_client = stub(Aws::S3::Client.new(stub_responses: true)) mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client } @s3_resource = mock(Aws::S3::Resource.new(client: @s3_client)) mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource } @s3_bucket = mock(Aws::S3::Bucket.new(name: "test", client: @s3_client)) @s3_bucket.exists? { true } @s3_resource.bucket(anything) { @s3_bucket } test_queue_url = "http://example.com/test_queue" @sqs_client = stub(Aws::SQS::Client.new(stub_responses: true)) @sqs_response = stub(Struct::StubResponse.new(test_queue_url)) @sqs_client.get_queue_url(queue_name: "test_queue", queue_owner_aws_account_id: "123456789123"){ @sqs_response } mock(Aws::SQS::Client).new(anything).once { @sqs_client } @real_poller = Aws::SQS::QueuePoller.new(test_queue_url, client: @sqs_client) @sqs_poller = stub(@real_poller) mock(Aws::SQS::QueuePoller).new(anything, client: @sqs_client) { @sqs_poller } end def test_no_records setup_mocks d = create_driver(CONFIG + "\ncheck_apikey_on_start false\n") mock(d.instance).process(anything).never message = Struct::StubMessage.new(1, 1, "{}") @sqs_poller.get_messages(anything, anything) do |config, stats| config.before_request.call(stats) if config.before_request stats.request_count += 1 if stats.request_count > 1 d.instance.instance_variable_set(:@running, false) end [message] end assert_nothing_raised do d.run {} end end def test_one_record setup_mocks d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as text\nformat none\n") s3_object = stub(Object.new) s3_response = stub(Object.new) s3_response.body { StringIO.new("aaa") } s3_object.get { s3_response } @s3_bucket.object(anything).at_least(1) { s3_object } body = { "Records" => [ { "s3" => { "object" => { "key" => "test_key" } } } ] } message = Struct::StubMessage.new(1, 1, Yajl.dump(body)) @sqs_poller.get_messages(anything, anything) do |config, stats| config.before_request.call(stats) if config.before_request stats.request_count += 1 if stats.request_count >= 1 d.instance.instance_variable_set(:@running, false) end [message] end d.run(expect_emits: 1) events = d.events assert_equal({ "message" => "aaa" }, events.first[2]) end def test_one_record_url_encoded setup_mocks d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as text\nformat none\n") s3_object = stub(Object.new) s3_response = stub(Object.new) s3_response.body { StringIO.new("aaa") } s3_object.get { s3_response } @s3_bucket.object('test key').at_least(1) { s3_object } body = { "Records" => [ { "s3" => { "object" => { "key" => "test+key" } } } ] } message = Struct::StubMessage.new(1, 1, Yajl.dump(body)) @sqs_poller.get_messages(anything, anything) do |config, stats| config.before_request.call(stats) if config.before_request stats.request_count += 1 if stats.request_count >= 1 d.instance.instance_variable_set(:@running, false) end [message] end d.run(expect_emits: 1) events = d.events assert_equal({ "message" => "aaa" }, events.first[2]) end def test_one_record_multi_line setup_mocks d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as text\nformat none\n") s3_object = stub(Object.new) s3_response = stub(Object.new) s3_response.body { StringIO.new("aaa\nbbb\nccc\n") } s3_object.get { s3_response } @s3_bucket.object(anything).at_least(1) { s3_object } body = { "Records" => [ { "s3" => { "object" => { "key" => "test_key" } } } ] } message = Struct::StubMessage.new(1, 1, Yajl.dump(body)) @sqs_poller.get_messages(anything, anything) do |config, stats| config.before_request.call(stats) if config.before_request stats.request_count += 1 if stats.request_count >= 1 d.instance.instance_variable_set(:@running, false) end [message] end d.run(expect_emits: 1) events = d.events expected_records = [ { "message" => "aaa\n" }, { "message" => "bbb\n" }, { "message" => "ccc\n" } ] assert_equal(expected_records, events.map {|_tag, _time, record| record }) end def test_gzip_single_stream setup_mocks d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as gzip\nformat none\n") s3_object = stub(Object.new) s3_response = stub(Object.new) s3_response.body { io = StringIO.new Zlib::GzipWriter.wrap(io) do |gz| gz.write "aaa\nbbb\n" gz.finish end io.rewind io } s3_object.get { s3_response } @s3_bucket.object(anything).at_least(1) { s3_object } body = { "Records" => [ { "s3" => { "object" => { "key" => "test_key" } } } ] } message = Struct::StubMessage.new(1, 1, Yajl.dump(body)) @sqs_poller.get_messages(anything, anything) do |config, stats| config.before_request.call(stats) if config.before_request stats.request_count += 1 if stats.request_count >= 1 d.instance.instance_variable_set(:@running, false) end [message] end d.run(expect_emits: 1) events = d.events expected_records = [ { "message" => "aaa\n" }, { "message" => "bbb\n" } ] assert_equal(expected_records, events.map {|_tag, _time, record| record }) end def test_gzip_multiple_steams setup_mocks d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as gzip\nformat none\n") s3_object = stub(Object.new) s3_response = stub(Object.new) s3_response.body { io = StringIO.new Zlib::GzipWriter.wrap(io) do |gz| gz.write "aaa\nbbb\n" gz.finish end Zlib::GzipWriter.wrap(io) do |gz| gz.write "ccc\nddd\n" gz.finish end io.rewind io } s3_object.get { s3_response } @s3_bucket.object(anything).at_least(1) { s3_object } body = { "Records" => [ { "s3" => { "object" => { "key" => "test_key" } } } ] } message = Struct::StubMessage.new(1, 1, Yajl.dump(body)) @sqs_poller.get_messages(anything, anything) do |config, stats| config.before_request.call(stats) if config.before_request stats.request_count += 1 if stats.request_count >= 1 d.instance.instance_variable_set(:@running, false) end [message] end d.run(expect_emits: 1) events = d.events expected_records = [ { "message" => "aaa\n" }, { "message" => "bbb\n" }, { "message" => "ccc\n" }, { "message" => "ddd\n" } ] assert_equal(expected_records, events.map {|_tag, _time, record| record }) end end