test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.8 vs test/embulk/input/test_mixpanel.rb in embulk-input-mixpanel-0.5.9
- old
+ new
@@ -14,10 +14,11 @@
TO_DATE = "2015-03-02".freeze
DAYS = 8
DATES = Date.parse(FROM_DATE)..(Date.parse(FROM_DATE) + DAYS - 1)
TIMEZONE = "Asia/Tokyo".freeze
JOB_START_TIME = 1506407051000
+ UPPER_LIMIT_DELAY = 300 # 300 seconds delay
DURATIONS = [
{from_date: FROM_DATE, to_date: "2015-02-28"}, # It has 7 days between 2015-02-22 and 2015-02-28
{from_date: "2015-03-01", to_date: TO_DATE},
]
@@ -44,13 +45,17 @@
stub(klass).export(params) { records }
end
end
end
def satisfy_task_ignore_start_time(expected_task)
- satisfy{|input_task|
+ satisfy {|input_task|
assert_not_nil(input_task[:job_start_time])
- assert_equal(expected_task, input_task.merge(job_start_time: expected_task[:job_start_time]))
+ # This will also verify incremental column upper limit
+ assert_equal(input_task[:incremental_column_upper_limit], input_task[:job_start_time] - UPPER_LIMIT_DELAY * 1000)
+ expected_task[:job_start_time] = input_task[:job_start_time]
+ expected_task[:incremental_column_upper_limit] = input_task[:incremental_column_upper_limit]
+ assert_equal(expected_task, input_task)
true
}
end
def setup_logger
stub(Embulk).logger { ::Logger.new(IO::NULL) }
@@ -648,11 +653,11 @@
adjusted = record_epoch - timezone_offset_seconds
mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2)
mock(page_builder).finish
any_instance_of(MixpanelApi::Client) do |klass|
stub(klass).export() do |params, block|
- assert_equal("(abc==def) and properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME}",params["where"])
+ assert_equal("(abc==def) and properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME - UPPER_LIMIT_DELAY * 1000}",params["where"])
records.each{|record| block.call(record) }
end
end
task_report = plugin.run
assert_equal(1234567919, task_report[:latest_fetched_time])
@@ -665,11 +670,11 @@
adjusted = record_epoch - timezone_offset_seconds
mock(page_builder).add(["FOO", adjusted, "event"]).times(records.length * 2)
mock(page_builder).finish
any_instance_of(MixpanelApi::Client) do |klass|
stub(klass).export() do |params, block|
- assert_equal("properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME}",params["where"])
+ assert_equal("properties[\"mp_processing_time_ms\"] > 1 and properties[\"mp_processing_time_ms\"] < #{JOB_START_TIME - UPPER_LIMIT_DELAY * 1000}",params["where"])
records.each{|record| block.call(record) }
end
end
task_report = plugin.run
assert_equal(1234567919, task_report[:latest_fetched_time])
@@ -826,11 +831,12 @@
fetch_custom_properties: false,
retry_initial_wait_sec: 2,
retry_limit: 3,
latest_fetched_time: 0,
slice_range: 7,
- job_start_time: JOB_START_TIME
+ job_start_time: JOB_START_TIME,
+ incremental_column_upper_limit: (JOB_START_TIME - UPPER_LIMIT_DELAY * 1000)
}
end
def records
[
@@ -863,9 +869,10 @@
fetch_unknown_columns: false,
fetch_custom_properties: false,
retry_initial_wait_sec: 2,
retry_limit: 3,
latest_fetched_time: 0,
+ incremental_column_upper_limit_delay_in_seconds: UPPER_LIMIT_DELAY
}
end
def embulk_config
DataSource[*config.to_a.flatten(1)]