test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.2 vs test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.3.alpha.1

- old
+ new

@@ -114,11 +114,11 @@ Mixpanel.guess(embulk_config(config)) end def test_json_type sample_records = records.map do |r| - r.merge("properties" => {"array" => [1,2], "hash" => {foo: "FOO"}}) + r.merge("properties" => {"time" => 1, "array" => [1, 2], "hash" => {foo: "FOO"}}) end actual = Mixpanel.guess_from_records(sample_records) assert actual.include?(name: "array", type: :json) assert actual.include?(name: "hash", type: :json) end @@ -191,10 +191,18 @@ assert_raise(Embulk::ConfigError) do Mixpanel.transaction(transaction_config((Date.today - 1).to_s).merge(fetch_days: -1)) end end + def test_default_configuration + stub(Mixpanel).resume {|task| + assert_nil(task[:incremental_column]) + assert_true(task[:incremental]) + } + Mixpanel.transaction(transaction_config(Date.today)) + end + private def transaction_config(from_date) _config = config.merge( from_date: from_date, @@ -280,10 +288,12 @@ def transaction_task(timezone) task.merge( dates: DATES.map {|date| date.to_s}, api_key: API_KEY, api_secret: API_SECRET, + incremental: true, + incremental_column: nil, timezone: timezone, schema: schema ) end @@ -302,10 +312,29 @@ mock(Mixpanel).resume(transaction_task(days), columns, 1, &control) Mixpanel.transaction(transaction_config(days), &control) end + def test_valid_days_with_backfill + days = 5 + + stub(Mixpanel).resume() do |task| + assert_equal(["2015-02-17", "2015-02-18", "2015-02-19", "2015-02-20", "2015-02-21", "2015-02-22", "2015-02-23", "2015-02-24", "2015-02-25", "2015-02-26"], task[:dates]) + end + config=transaction_config(days).merge("back_fill_days" => 5, "incremental_column" => "test_column", "latest_fetched_time" => 1501599491000) + Mixpanel.transaction(config, &control) + end + + def test_valid_days_with_backfill_first_run + days = 5 + stub(Mixpanel).resume() do |task| + assert_equal(transaction_task(days)[:dates], task[:dates]) + end + config=transaction_config(days).merge("back_fill_days" => 5, "incremental_column" => "test_column") + Mixpanel.transaction(config, &control) + end + def test_invalid_days days = 0 assert_raise(Embulk::ConfigError) do Mixpanel.transaction(transaction_config(days), &control) @@ -547,10 +576,100 @@ mock(@page_builder).finish @plugin.run end + class NonIncrementalRunTest < self + + def test_non_incremental_run + + mock(@page_builder).add(anything).times(records.length * 2) + mock(@page_builder).finish + task_report = @plugin.run + assert_equal(0, task_report[:latest_fetched_time]) + end + + def task + super.merge(incremental: false) + end + + end + + class IncrementalRunTest < self + + def test_incremental_run + dont_allow(mock(@page_builder)).add(anything) + mock(@page_builder).finish + task_report = @plugin.run + assert_equal(record_epoch+1, task_report[:latest_fetched_time]) + end + + def task + super.merge(incremental: true, latest_fetched_time: record_epoch+1) + end + + end + + class IncrementalColumnTest < self + + def setup + end + + def test_incremental_column_with_where + page_builder = Object.new + plugin = Mixpanel.new(task.merge(params: task[:params].merge("where" => "abc==def")), nil, nil, page_builder) + stub(plugin).preview? {false} + adjusted = record_epoch - timezone_offset_seconds + mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2) + mock(page_builder).finish + any_instance_of(MixpanelApi::Client) do |klass| + stub(klass).export() do |params, block| + assert_equal('(abc==def) and properties["mp_processing_time_ms"] > 0',params["where"]) + records.each{|record| block.call(record) } + end + end + task_report = plugin.run + assert_equal(1234567919, task_report[:latest_fetched_time]) + end + + def test_incremental_column + page_builder = Object.new + plugin = Mixpanel.new(task, nil, nil, page_builder) + stub(plugin).preview? {false} + adjusted = record_epoch - timezone_offset_seconds + mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2) + mock(page_builder).finish + any_instance_of(MixpanelApi::Client) do |klass| + stub(klass).export() do |params, block| + assert_equal('properties["mp_processing_time_ms"] > 0',params["where"]) + records.each{|record| block.call(record) } + end + end + task_report = plugin.run + assert_equal(1234567919, task_report[:latest_fetched_time]) + end + + def records + super.each_with_index.map {|record, i| + record['properties']['mp_processing_time_ms'] = record_epoch+i + record + } + end + + def schema + [ + {"name" => "foo", "type" => "string"}, + {"name" => "time", "type" => "integer"}, + {"name" => "event", "type" => "string"}, + ] + end + + def task + super.merge(incremental_column: 'mp_processing_time_ms') + end + end + class CustomPropertiesTest < self def setup super @page_builder = Object.new @plugin = Mixpanel.new(task, nil, nil, @page_builder) @@ -669,9 +788,11 @@ def task { api_key: API_KEY, api_secret: API_SECRET, timezone: TIMEZONE, + incremental: true, + incremental_column: nil, schema: schema, dates: DATES.to_a.map(&:to_s), params: Mixpanel.export_params(embulk_config), fetch_unknown_columns: false, fetch_custom_properties: false,