test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.8 vs test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.9

- old
+ new

@@ -14,10 +14,11 @@ TO_DATE = "2015-03-02".freeze DAYS = 8 DATES = Date.parse(FROM_DATE)..(Date.parse(FROM_DATE) + DAYS - 1) TIMEZONE = "Asia/Tokyo".freeze JOB_START_TIME = 1506407051000 + UPPER_LIMIT_DELAY = 300 # 300 seconds delay DURATIONS = [ {from_date: FROM_DATE, to_date: "2015-02-28"}, # It has 7 days between 2015-02-22 and 2015-02-28 {from_date: "2015-03-01", to_date: TO_DATE}, ] @@ -44,13 +45,17 @@ stub(klass).export(params) { records } end end end def satisfy_task_ignore_start_time(expected_task) - satisfy{|input_task| + satisfy {|input_task| assert_not_nil(input_task[:job_start_time]) - assert_equal(expected_task, input_task.merge(job_start_time: expected_task[:job_start_time])) + # This will also verify incremental column upper limit + assert_equal(input_task[:incremental_column_upper_limit], input_task[:job_start_time] - UPPER_LIMIT_DELAY * 1000) + expected_task[:job_start_time] = input_task[:job_start_time] + expected_task[:incremental_column_upper_limit] = input_task[:incremental_column_upper_limit] + assert_equal(expected_task, input_task) true } end def setup_logger stub(Embulk).logger { ::Logger.new(IO::NULL) } @@ -648,11 +653,11 @@ adjusted = record_epoch - timezone_offset_seconds mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2) mock(page_builder).finish any_instance_of(MixpanelApi::Client) do |klass| stub(klass).export() do |params, block| - assert_equal("(abc==def) and properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME}",params["where"]) + assert_equal("(abc==def) and properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME - UPPER_LIMIT_DELAY * 1000}",params["where"]) records.each{|record| block.call(record) } end end task_report = plugin.run assert_equal(1234567919, task_report[:latest_fetched_time]) @@ -665,11 +670,11 @@ adjusted = record_epoch - timezone_offset_seconds mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2) mock(page_builder).finish any_instance_of(MixpanelApi::Client) do |klass| stub(klass).export() do |params, block| - assert_equal("properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME}",params["where"]) + assert_equal("properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME - UPPER_LIMIT_DELAY * 1000}",params["where"]) records.each{|record| block.call(record) } end end task_report = plugin.run assert_equal(1234567919, task_report[:latest_fetched_time]) @@ -826,11 +831,12 @@ fetch_custom_properties: false, retry_initial_wait_sec: 2, retry_limit: 3, latest_fetched_time: 0, slice_range: 7, - job_start_time: JOB_START_TIME + job_start_time: JOB_START_TIME, + incremental_column_upper_limit: (JOB_START_TIME - UPPER_LIMIT_DELAY * 1000) } end def records [ @@ -863,9 +869,10 @@ fetch_unknown_columns: false, fetch_custom_properties: false, retry_initial_wait_sec: 2, retry_limit: 3, latest_fetched_time: 0, + incremental_column_upper_limit_delay_in_seconds: UPPER_LIMIT_DELAY } end def embulk_config DataSource[*config.to_a.flatten(1)]