# -*- coding: utf-8 -*-
require 'helper'
class DataCalculatorOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
CONFIG = %[
unit minute
aggregate tag
input_tag_remove_prefix test
formulas sum = amount * price, amounts = amount, record = 1
finalizer ave = amounts > 0 ? 1.0 * sum / amounts : 0
]
def create_driver(conf = CONFIG, tag='test.input')
Fluent::Test::OutputTestDriver.new(Fluent::DataCalculatorOutput, tag).configure(conf)
end
def test_configure
assert_raise(Fluent::ConfigError) {
d = create_driver('')
}
# 式がSyntax Error
assert_raise(Fluent::ConfigError) {
d = create_driver %[
formulas sum = 10 ab
]
}
# aggregateに必要な要素がない
assert_raise(Fluent::ConfigError) {
d = create_driver %[
aggregate keys
]
}
# finalizerに必要な要素がない
assert_raise(Fluent::ConfigError) {
d = create_driver %[
finalizer ave = cnt > 0 ? sum / cnt : 0
]
}
# finalizerに必要な要素がない
assert_raise(Fluent::ConfigError) {
d = create_driver %[
formulas sum = 10
finalizer ave = cnt > 0 ? sum / cnt : 0
]
}
d = create_driver %[
formulas sum = amount * price, cnt = amount
finalizer ave = cnt > 0 ? sum / cnt : 0
]
assert_equal 60, d.instance.tick
assert_equal :tag, d.instance.aggregate
assert_equal 'datacalculate', d.instance.tag
assert_nil d.instance.input_tag_remove_prefix
assert_equal 'sum = amount * price, cnt = amount', d.instance.formulas
assert_equal 'ave = cnt > 0 ? sum / cnt : 0', d.instance.finalizer
end
def test_count_initialized
d = create_driver %[
aggregate all
formulas sum = amount * price, cnt = amount
]
assert_equal [0,0], d.instance.counts['all']
end
def test_create_formula
d = create_driver %[
aggregate all
formulas sum = amount * price, cnt = amount
]
assert_equal 0, d.instance._formulas[0][0]
assert_equal 'sum', d.instance._formulas[0][1]
assert_equal ['amount', 'price'], d.instance._formulas[0][2]
assert_equal 1, d.instance._formulas[1][0]
assert_equal 'cnt', d.instance._formulas[1][1]
assert_equal ['amount'], d.instance._formulas[1][2]
end
def test_countups
d = create_driver
assert_nil d.instance.counts['test.input']
d.instance.countups('test.input', [0, 0, 0])
assert_equal [0,0,0], d.instance.counts['test.input']
d.instance.countups('test.input', [1, 1, 0])
assert_equal [1,1,0], d.instance.counts['test.input']
d.instance.countups('test.input', [5, 1, 0])
assert_equal [6,2,0], d.instance.counts['test.input']
end
def test_stripped_tag
d = create_driver
assert_equal 'input', d.instance.stripped_tag('test.input')
assert_equal 'test.input', d.instance.stripped_tag('test.test.input')
assert_equal 'input', d.instance.stripped_tag('input')
end
def test_aggregate_keys
d = create_driver %[
aggregate keys area_id, mission_id
formulas sum = amount * price, cnt = amount
]
assert_equal 'keys', d.instance.aggregate
assert_equal ['area_id', 'mission_id'], d.instance.aggregate_keys
end
def test_generate_output
d = create_driver
r1 = d.instance.generate_output({'test.input' => [240,120,180], 'test.input2' => [600,0,0]}, 60)[0]
assert_equal 240, r1['input_sum']
assert_equal 120, r1['input_amounts']
assert_equal 180, r1['input_record']
assert_equal 2, r1['input_ave']
assert_equal 600, r1['input2_sum']
assert_equal 0, r1['input2_amounts']
assert_equal 0, r1['input2_record']
assert_equal 0, r1['input2_ave']
d = create_driver %[
aggregate all
input_tag_remove_prefix test
formulas sum = amount * price, amounts = amount, record = 1
finalizer ave = amounts > 0 ? sum / amounts : 0
]
r2 = d.instance.generate_output({'all' => [240,120,180]}, 60)[0]
assert_equal 240, r2['sum']
assert_equal 120, r2['amounts']
assert_equal 180, r2['record']
assert_equal 2, r2['ave']
end
def test_emit
d1 = create_driver(CONFIG, 'test.input')
d1.run do
60.times do
d1.emit({'amount' => 3, 'price' => 100})
d1.emit({'amount' => 3, 'price' => 200})
d1.emit({'amount' => 6, 'price' => 50})
d1.emit({'amount' => 10, 'price' => 100})
end
end
r1 = d1.instance.flush(60)[0]
assert_equal 132000, r1['input_sum']
assert_equal 1320, r1['input_amounts']
assert_equal 240, r1['input_record']
assert_equal 100.0, r1['input_ave']
d2 = create_driver(%[
unit minute
aggregate all
input_tag_remove_prefix test
formulas sum = amount * price, amounts = amount, record = 1
finalizer ave = amounts > 0 ? 1.0 * sum / amounts : 0
], 'test.input2')
d2.run do
60.times do
d2.emit({'amount' => 3, 'price' => 100})
d2.emit({'amount' => 3, 'price' => 200})
d2.emit({'amount' => 6, 'price' => 50})
d2.emit({'amount' => 10, 'price' => 100})
end
end
r2 = d2.instance.flush(60)[0]
assert_equal 132000, r2['sum']
assert_equal 1320, r2['amounts']
assert_equal 240, r2['record']
assert_equal 100.0, r2['ave']
d3 = create_driver(%[
unit minute
aggregate keys area_id, mission_id
formulas sum = amount * price, count = 1
type stdout
], 'test.input3')
sums = {}
counts = {}
d3.run do
240.times do
area_id = rand(5)
mission_id = rand(5)
amount = rand(10)
price = rand(5) * 100
pat = [area_id, mission_id].join(',')
d3.emit({'amount' => amount, 'price' => price, 'area_id' => area_id, 'mission_id' => mission_id})
sums[pat] = 0 unless sums.has_key?(pat)
counts[pat] = 0 unless counts.has_key?(pat)
sums[pat] += amount * price
counts[pat] += 1
end
end
r3 = d3.instance.flush(60)
r3.each do |r|
pat = [r['area_id'], r['mission_id']].join(',')
assert_equal sums[pat], r['sum']
assert_equal counts[pat], r['count']
end
d4 = create_driver(%[
unit minute
aggregate keys area_id, mission_id
formulas sum = amount * price, count = 1
type stdout
], 'test.input3')
sums = {}
counts = {}
d4.run do
240.times do
area_id = 'area_'+rand(5).to_s
mission_id = 'mission_'+rand(5).to_s
amount = rand(10)
price = rand(5) * 100
pat = [area_id, mission_id].join('_$_')
d4.emit({'amount' => amount, 'price' => price, 'area_id' => area_id, 'mission_id' => mission_id})
sums[pat] = 0 unless sums.has_key?(pat)
counts[pat] = 0 unless counts.has_key?(pat)
sums[pat] += amount * price
counts[pat] += 1
end
end
r4 = d4.instance.flush(60)
r4.each do |r|
pat = [r['area_id'], r['mission_id']].join('_$_')
assert_equal sums[pat], r['sum']
assert_equal counts[pat], r['count']
end
end
end