test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.2 vs test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.3.alpha.1
- old
+ new
@@ -114,11 +114,11 @@
Mixpanel.guess(embulk_config(config))
end
def test_json_type
sample_records = records.map do |r|
- r.merge("properties" => {"array" => [1,2], "hash" => {foo: "FOO"}})
+ r.merge("properties" => {"time" => 1, "array" => [1, 2], "hash" => {foo: "FOO"}})
end
actual = Mixpanel.guess_from_records(sample_records)
assert actual.include?(name: "array", type: :json)
assert actual.include?(name: "hash", type: :json)
end
@@ -191,10 +191,18 @@
assert_raise(Embulk::ConfigError) do
Mixpanel.transaction(transaction_config((Date.today - 1).to_s).merge(fetch_days: -1))
end
end
+ def test_default_configuration
+ stub(Mixpanel).resume {|task|
+ assert_nil(task[:incremental_column])
+ assert_true(task[:incremental])
+ }
+ Mixpanel.transaction(transaction_config(Date.today))
+ end
+
private
def transaction_config(from_date)
_config = config.merge(
from_date: from_date,
@@ -280,10 +288,12 @@
def transaction_task(timezone)
task.merge(
dates: DATES.map {|date| date.to_s},
api_key: API_KEY,
api_secret: API_SECRET,
+ incremental: true,
+ incremental_column: nil,
timezone: timezone,
schema: schema
)
end
@@ -302,10 +312,29 @@
mock(Mixpanel).resume(transaction_task(days), columns, 1, &control)
Mixpanel.transaction(transaction_config(days), &control)
end
+ def test_valid_days_with_backfill
+ days = 5
+
+ stub(Mixpanel).resume() do |task|
+ assert_equal(["2015-02-17", "2015-02-18", "2015-02-19", "2015-02-20", "2015-02-21", "2015-02-22", "2015-02-23", "2015-02-24", "2015-02-25", "2015-02-26"], task[:dates])
+ end
+ config=transaction_config(days).merge("back_fill_days" => 5, "incremental_column" => "test_column", "latest_fetched_time" => 1501599491000)
+ Mixpanel.transaction(config, &control)
+ end
+
+ def test_valid_days_with_backfill_first_run
+ days = 5
+ stub(Mixpanel).resume() do |task|
+ assert_equal(transaction_task(days)[:dates], task[:dates])
+ end
+ config=transaction_config(days).merge("back_fill_days" => 5, "incremental_column" => "test_column")
+ Mixpanel.transaction(config, &control)
+ end
+
def test_invalid_days
days = 0
assert_raise(Embulk::ConfigError) do
Mixpanel.transaction(transaction_config(days), &control)
@@ -547,10 +576,100 @@
mock(@page_builder).finish
@plugin.run
end
+ class NonIncrementalRunTest < self
+
+ def test_non_incremental_run
+
+ mock(@page_builder).add(anything).times(records.length * 2)
+ mock(@page_builder).finish
+ task_report = @plugin.run
+ assert_equal(0, task_report[:latest_fetched_time])
+ end
+
+ def task
+ super.merge(incremental: false)
+ end
+
+ end
+
+ class IncrementalRunTest < self
+
+ def test_incremental_run
+ dont_allow(mock(@page_builder)).add(anything)
+ mock(@page_builder).finish
+ task_report = @plugin.run
+ assert_equal(record_epoch+1, task_report[:latest_fetched_time])
+ end
+
+ def task
+ super.merge(incremental: true, latest_fetched_time: record_epoch+1)
+ end
+
+ end
+
+ class IncrementalColumnTest < self
+
+ def setup
+ end
+
+ def test_incremental_column_with_where
+ page_builder = Object.new
+ plugin = Mixpanel.new(task.merge(params: task[:params].merge("where" => "abc==def")), nil, nil, page_builder)
+ stub(plugin).preview? {false}
+ adjusted = record_epoch - timezone_offset_seconds
+ mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2)
+ mock(page_builder).finish
+ any_instance_of(MixpanelApi::Client) do |klass|
+ stub(klass).export() do |params, block|
+ assert_equal('(abc==def) and properties["mp_processing_time_ms"] > 0',params["where"])
+ records.each{|record| block.call(record) }
+ end
+ end
+ task_report = plugin.run
+ assert_equal(1234567919, task_report[:latest_fetched_time])
+ end
+
+ def test_incremental_column
+ page_builder = Object.new
+ plugin = Mixpanel.new(task, nil, nil, page_builder)
+ stub(plugin).preview? {false}
+ adjusted = record_epoch - timezone_offset_seconds
+ mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2)
+ mock(page_builder).finish
+ any_instance_of(MixpanelApi::Client) do |klass|
+ stub(klass).export() do |params, block|
+ assert_equal('properties["mp_processing_time_ms"] > 0',params["where"])
+ records.each{|record| block.call(record) }
+ end
+ end
+ task_report = plugin.run
+ assert_equal(1234567919, task_report[:latest_fetched_time])
+ end
+
+ def records
+ super.each_with_index.map {|record, i|
+ record['properties']['mp_processing_time_ms'] = record_epoch+i
+ record
+ }
+ end
+
+ def schema
+ [
+ {"name" => "foo", "type" => "string"},
+ {"name" => "time", "type" => "integer"},
+ {"name" => "event", "type" => "string"},
+ ]
+ end
+
+ def task
+ super.merge(incremental_column: 'mp_processing_time_ms')
+ end
+ end
+
class CustomPropertiesTest < self
def setup
super
@page_builder = Object.new
@plugin = Mixpanel.new(task, nil, nil, @page_builder)
@@ -669,9 +788,11 @@
def task
{
api_key: API_KEY,
api_secret: API_SECRET,
timezone: TIMEZONE,
+ incremental: true,
+ incremental_column: nil,
schema: schema,
dates: DATES.to_a.map(&:to_s),
params: Mixpanel.export_params(embulk_config),
fetch_unknown_columns: false,
fetch_custom_properties: false,