require 'helper'
require 'fluent/plugin/out_azure-storage-append-blob.rb'
require 'azure/core/http/http_response'
require 'azure/core/http/http_error'
require 'stringio'
require 'zlib'
include Fluent::Test::Helpers
class AzureStorageAppendBlobOutTest < Test::Unit::TestCase
setup do
Fluent::Test.setup
end
CONFIG = %(
azure_cloud AZUREGERMANCLOUD
azure_storage_account test_storage_account
azure_storage_access_key MY_FAKE_SECRET
azure_container test_container
time_slice_format %Y%m%d-%H
path log
).freeze
CONNSTR_CONFIG = %(
azure_storage_connection_string https://test
time_slice_format %Y%m%d-%H
path log
).freeze
MSI_CONFIG = %(
azure_storage_account test_storage_account
azure_container test_container
azure_imds_api_version 1970-01-01
azure_token_refresh_interval 120
time_slice_format %Y%m%d-%H
path log
).freeze
AZURESTACKCLOUD_CONFIG = %(
azure_cloud AZURESTACKCLOUD
azure_storage_dns_suffix test.storage.dns.suffix
azure_storage_account test_storage_account
azure_storage_access_key MY_FAKE_SECRET
azure_container test_container
time_slice_format %Y%m%d-%H
path log
).freeze
def create_driver(conf: CONFIG, service: nil)
d = Fluent::Test::Driver::Output.new(Fluent::Plugin::AzureStorageAppendBlobOut).configure(conf)
d.instance.instance_variable_set(:@bs, service)
d.instance.instance_variable_set(:@azure_storage_path, 'storage_path')
d
end
sub_test_case 'test config' do
test 'config should reject with no azure container' do
assert_raise Fluent::ConfigError.new('azure_container needs to be specified') do
create_driver conf: %(
azure_storage_account test_storage_account
azure_storage_access_key MY_FAKE_SECRET
time_slice_format %Y%m%d-%H
time_slice_wait 10m
path log
)
end
end
test 'config should reject for invalid cloud ' do
assert_raise Fluent::ConfigError.new('azure_cloud invalid, must be either of AZURECHINACLOUD, AZUREGERMANCLOUD, AZUREPUBLICCLOUD, AZUREUSGOVERNMENTCLOUD, AZURESTACKCLOUD') do
create_driver conf: %(
azure_cloud INVALIDCLOUD
)
end
end
test 'config should reject for Azure Stack Cloud with no azure storage dns suffix' do
assert_raise Fluent::ConfigError.new('azure_storage_dns_suffix invalid, must not be empty for AZURESTACKCLOUD') do
create_driver conf: %(
azure_cloud AZURESTACKCLOUD
)
end
end
test 'config with access key should set instance variables' do
d = create_driver
assert_equal 'core.cloudapi.de', d.instance.instance_variable_get(:@azure_storage_dns_suffix)
assert_equal 'test_storage_account', d.instance.azure_storage_account
assert_equal 'MY_FAKE_SECRET', d.instance.azure_storage_access_key
assert_equal 'test_container', d.instance.azure_container
assert_equal true, d.instance.auto_create_container
assert_equal '%{path}%{time_slice}-%{index}.log', d.instance.azure_object_key_format
end
test 'config with managed identity enabled should set instance variables' do
d = create_driver conf: MSI_CONFIG
assert_equal 'test_storage_account', d.instance.azure_storage_account
assert_equal 'test_container', d.instance.azure_container
assert_equal true, d.instance.instance_variable_get(:@use_msi)
assert_equal true, d.instance.auto_create_container
assert_equal '%{path}%{time_slice}-%{index}.log', d.instance.azure_object_key_format
assert_equal 120, d.instance.azure_token_refresh_interval
assert_equal '1970-01-01', d.instance.azure_imds_api_version
end
test 'config with connection string should set instance variables' do
d = create_driver conf: CONNSTR_CONFIG
assert_equal 'https://test', d.instance.azure_storage_connection_string
assert_equal false, d.instance.instance_variable_get(:@use_msi)
assert_equal true, d.instance.auto_create_container
end
test 'config for Azure Stack Cloud should set instance variables' do
d = create_driver conf: AZURESTACKCLOUD_CONFIG
assert_equal 'test.storage.dns.suffix', d.instance.instance_variable_get(:@azure_storage_dns_suffix)
assert_equal 'test_storage_account', d.instance.azure_storage_account
assert_equal 'MY_FAKE_SECRET', d.instance.azure_storage_access_key
assert_equal 'test_container', d.instance.azure_container
assert_equal true, d.instance.auto_create_container
assert_equal '%{path}%{time_slice}-%{index}.log', d.instance.azure_object_key_format
end
end
sub_test_case 'test path slicing' do
test 'test path_slicing' do
config = CONFIG.clone.gsub(/path\slog/, 'path log/%Y/%m/%d')
d = create_driver conf: config
path_slicer = d.instance.instance_variable_get(:@path_slicer)
path = d.instance.instance_variable_get(:@path)
slice = path_slicer.call(path)
assert_equal slice, Time.now.utc.strftime('log/%Y/%m/%d')
end
test 'path slicing utc' do
config = CONFIG.clone.gsub(/path\slog/, 'path log/%Y/%m/%d')
config << "\nutc\n"
d = create_driver conf: config
path_slicer = d.instance.instance_variable_get(:@path_slicer)
path = d.instance.instance_variable_get(:@path)
slice = path_slicer.call(path)
assert_equal slice, Time.now.utc.strftime('log/%Y/%m/%d')
end
end
sub_test_case 'compress options' do
test 'compress default value' do
d = create_driver
assert_equal false, d.instance.instance_variable_get(:@compress)
end
test 'compress set true' do
config = CONFIG.clone + "\ncompress true\n"
d = create_driver conf: config
assert_equal true, d.instance.instance_variable_get(:@compress)
end
end
# This class is used to create an Azure::Core::Http::HTTPError. HTTPError parses
# a response object when it is created.
class FakeResponse
def initialize(status = 404)
@status = status
@body = 'body'
@headers = {}
end
attr_reader :status, :body, :headers
end
def uncompress_blocks(blocks)
gzip_data = blocks.join
uncompressed_data = ""
while gzip_data do
Zlib::GzipReader.wrap(StringIO.new(gzip_data.b)) do |gz|
uncompressed_data << gz.read
gzip_data = gz.unused
end
end
uncompressed_data
end
sub_test_case 'test append blob for compress' do
test 'compress 2 events 1 chunk' do
config = CONFIG.clone + %(
compress true
timekey 5
timekey_wait 0
)
svc = FakeBlobService.new(200)
d = create_driver conf:config, service: svc
d.run(default_tag: 'test') do
d.feed(event_time("2011-01-02 13:14:15 UTC"), { :a => 1 })
d.feed(event_time("2011-01-02 13:14:15 UTC"), { :a => 2 })
end
uncompressed_data = uncompress_blocks(svc.blocks)
expected = "2011-01-02T13:14:15+00:00\ttest\t{\"a\":1}\n" +
"2011-01-02T13:14:15+00:00\ttest\t{\"a\":2}\n"
assert_equal(expected, uncompressed_data)
assert_equal(10, svc.blocks.size)
end
test 'compress 2 events 2 chunk' do
config = CONFIG.clone + %(
compress true
timekey 5
timekey_wait 0
)
svc = FakeBlobService.new(200)
d = create_driver conf:config, service: svc
d.run(default_tag: 'test') do
d.feed(event_time("2011-01-02 13:14:00 UTC"), { :a => 1 })
d.feed(event_time("2011-01-02 13:14:15 UTC"), { :a => 2 }) # after 15 sec
end
uncompressed_data = uncompress_blocks(svc.blocks)
expected = "2011-01-02T13:14:00+00:00\ttest\t{\"a\":1}\n" +
"2011-01-02T13:14:15+00:00\ttest\t{\"a\":2}\n"
assert_equal(expected, uncompressed_data)
assert_equal(20, svc.blocks.size)
end
end
# This class is used to test plugin functions which interact with the blob service
class FakeBlobService
def initialize(status)
@response = Azure::Core::Http::HttpResponse.new(FakeResponse.new(status))
@blocks = []
end
attr_reader :blocks
def append_blob_block(_container, _path, data, options={})
@blocks.append(data)
end
def get_container_properties(_container)
unless @response.status_code == 200
raise Azure::Core::Http::HTTPError.new(@response)
end
end
end
sub_test_case 'test container_exists' do
test 'container 404 returns false' do
d = create_driver service: FakeBlobService.new(404)
assert_false d.instance.container_exists? 'anything'
end
test 'existing container returns true' do
d = create_driver service: FakeBlobService.new(200)
assert_true d.instance.container_exists? 'anything'
end
test 'unexpected exception raises' do
d = create_driver service: FakeBlobService.new(500)
assert_raise_kind_of Azure::Core::Http::HTTPError do
d.instance.container_exists? 'anything'
end
end
end
# Override the block size limit so that mocked requests do not require huge buffers
class Fluent::Plugin::AzureStorageAppendBlobOut
AZURE_BLOCK_SIZE_LIMIT = 10
end
sub_test_case 'test append blob buffering' do
def fake_appended_blocks(content)
# run the append on the fake blob service, return a list of append request buffers
svc = FakeBlobService.new(200)
d = create_driver service: svc
d.instance.send(:append_blob, content, nil)
svc.blocks
end
test 'short buffer appends once' do
content = '123456789'
blocks = fake_appended_blocks content
assert_equal [content], blocks
end
test 'single character appends once' do
content = '1'
blocks = fake_appended_blocks content
assert_equal [content], blocks
end
test 'empty appends once' do
content = ''
blocks = fake_appended_blocks content
assert_equal [''], blocks
end
test 'long buffer appends multiple times' do
limit = Fluent::Plugin::AzureStorageAppendBlobOut::AZURE_BLOCK_SIZE_LIMIT
buf1 = 'a' * limit
buf2 = 'a' * 3
blocks = fake_appended_blocks buf1 + buf2
assert_equal [buf1, buf2], blocks
end
end
end