spec/utils/db_poller_spec.rb in deimos-ruby-1.17.1 vs spec/utils/db_poller_spec.rb in deimos-ruby-1.18.0
- old
+ new
@@ -1,16 +1,19 @@
# frozen_string_literal: true
+require 'deimos/utils/db_poller'
+
# rubocop:disable Layout/LineLength
-# @param seconds [Integer]
+# @param secs [Integer]
+# @param mins [Integer]
# @return [Time]
def time_value(secs: 0, mins: 0)
Time.local(2015, 5, 5, 1, 0, 0) + (secs + (mins * 60))
end
-each_db_config(Deimos::Utils::DbPoller) do
+each_db_config(Deimos::Utils::DbPoller::Base) do
before(:each) do
Deimos::PollInfo.delete_all
end
@@ -34,11 +37,11 @@
stub_const('MyProducerWithID', producer_class)
end
it 'should raise an error if no pollers configured' do
Deimos.configure {}
- expect { described_class.start! }.to raise_error('No pollers configured!')
+ expect { Deimos::Utils::DbPoller.start! }.to raise_error('No pollers configured!')
end
it 'should start pollers as configured' do
Deimos.configure do
db_poller do
@@ -47,27 +50,27 @@
db_poller do
producer_class 'MyProducerWithID'
end
end
- allow(Deimos::Utils::DbPoller).to receive(:new)
+ allow(Deimos::Utils::DbPoller::TimeBased).to receive(:new)
signal_double = instance_double(Sigurd::SignalHandler, run!: nil)
allow(Sigurd::SignalHandler).to receive(:new).and_return(signal_double)
- described_class.start!
- expect(Deimos::Utils::DbPoller).to have_received(:new).twice
- expect(Deimos::Utils::DbPoller).to have_received(:new).
+ Deimos::Utils::DbPoller.start!
+ expect(Deimos::Utils::DbPoller::TimeBased).to have_received(:new).twice
+ expect(Deimos::Utils::DbPoller::TimeBased).to have_received(:new).
with(Deimos.config.db_poller_objects[0])
- expect(Deimos::Utils::DbPoller).to have_received(:new).
+ expect(Deimos::Utils::DbPoller::TimeBased).to have_received(:new).
with(Deimos.config.db_poller_objects[1])
end
end
describe 'pollers' do
include_context 'with widgets'
let(:poller) do
- poller = described_class.new(config)
+ poller = Deimos::Utils::DbPoller.class_for_config(config.mode).new(config)
allow(poller).to receive(:sleep)
poller
end
let(:config) { Deimos.config.db_poller_objects.first.dup }
@@ -158,10 +161,11 @@
expect(poller.should_run?).to eq(false)
end
describe '#process_batch' do
let(:widgets) { (1..3).map { Widget.create!(test_id: 'some_id', some_int: 4) } }
+ let(:status) { Deimos::Utils::DbPoller::PollStatus.new(0, 0, 0) }
before(:each) do
allow(Deimos.config.tracer).to receive(:start).and_return('a span')
allow(Deimos.config.tracer).to receive(:set_error)
allow(Deimos.config.tracer).to receive(:finish)
@@ -170,19 +174,22 @@
it 'should process the batch' do
travel_to time_value
widgets.last.update_attribute(:updated_at, time_value(mins: -30))
expect(MyProducer).to receive(:send_events).with(widgets)
poller.retrieve_poll_info
- poller.process_batch(widgets)
+ poller.process_and_touch_info(widgets, status)
info = Deimos::PollInfo.last
expect(info.last_sent.in_time_zone).to eq(time_value(mins: -30))
expect(info.last_sent_id).to eq(widgets.last.id)
end
it 'should create a span' do
poller.retrieve_poll_info
- poller.process_batch_with_span(widgets)
+ poller.process_batch_with_span(widgets, status)
+ expect(status.batches_errored).to eq(0)
+ expect(status.batches_processed).to eq(1)
+ expect(status.messages_processed).to eq(3)
expect(Deimos.config.tracer).to have_received(:finish).with('a span')
end
it 'should retry on Kafka error' do
called_once = false
@@ -192,23 +199,29 @@
called_once = true
raise Kafka::Error, 'OH NOES'
end
end
poller.retrieve_poll_info
- poller.process_batch_with_span(widgets)
+ poller.process_batch_with_span(widgets, status)
expect(poller).to have_received(:sleep).once.with(0.5)
expect(Deimos.config.tracer).to have_received(:finish).with('a span')
+ expect(status.batches_errored).to eq(0)
+ expect(status.batches_processed).to eq(1)
+ expect(status.messages_processed).to eq(3)
end
it 'should retry only once on other errors' do
error = RuntimeError.new('OH NOES')
allow(poller).to receive(:sleep)
allow(poller).to receive(:process_batch).and_raise(error)
poller.retrieve_poll_info
- poller.process_batch_with_span(widgets)
+ poller.process_batch_with_span(widgets, status)
expect(poller).to have_received(:sleep).once.with(0.5)
expect(Deimos.config.tracer).to have_received(:set_error).with('a span', error)
+ expect(status.batches_errored).to eq(1)
+ expect(status.batches_processed).to eq(0)
+ expect(status.messages_processed).to eq(3)
end
end
describe '#process_updates' do
before(:each) do
@@ -241,62 +254,62 @@
it 'should update the full table' do
info = Deimos::PollInfo.last
config.full_table = true
expect(MyProducer).to receive(:poll_query).at_least(:once).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([old_widget, widgets[0], widgets[1]]).and_wrap_original do |m, *args|
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([old_widget, widgets[0], widgets[1]], anything).and_wrap_original do |m, *args|
m.call(*args)
expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 32))
expect(info.last_sent_id).to eq(widgets[1].id)
end
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[2], widgets[3], widgets[4]]).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[5], widgets[6]]).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[2], widgets[3], widgets[4]], anything).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[5], widgets[6]], anything).and_call_original
poller.process_updates
# this is the updated_at of widgets[6]
expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 37))
expect(info.last_sent_id).to eq(widgets[6].id)
last_widget.update_attribute(:updated_at, time_value(mins: -250))
travel 61.seconds
# should reprocess the table
- expect(poller).to receive(:process_batch).ordered.
- with([last_widget, old_widget, widgets[0]]).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[1], widgets[2], widgets[3]]).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[4], widgets[5], widgets[6]]).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([last_widget, old_widget, widgets[0]], anything).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[1], widgets[2], widgets[3]], anything).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[4], widgets[5], widgets[6]], anything).and_call_original
poller.process_updates
expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 37))
expect(info.last_sent_id).to eq(widgets[6].id)
end
it 'should send events across multiple batches' do
allow(Deimos.config.logger).to receive(:info)
allow(MyProducer).to receive(:poll_query).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[0], widgets[1], widgets[2]]).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[3], widgets[4], widgets[5]]).and_call_original
- expect(poller).to receive(:process_batch).ordered.
- with([widgets[6]]).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[0], widgets[1], widgets[2]], anything).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[3], widgets[4], widgets[5]], anything).and_call_original
+ expect(poller).to receive(:process_and_touch_info).ordered.
+ with([widgets[6]], anything).and_call_original
poller.process_updates
expect(MyProducer).to have_received(:poll_query).
with(time_from: time_value(mins: -61),
time_to: time_value(secs: -2),
column_name: :updated_at,
min_id: 0)
travel 61.seconds
# process the last widget which came in during the delay
- expect(poller).to receive(:process_batch).with([last_widget]).
+ expect(poller).to receive(:process_and_touch_info).with([last_widget], anything).
and_call_original
poller.process_updates
# widgets[6] updated_at value
expect(MyProducer).to have_received(:poll_query).
@@ -305,21 +318,21 @@
column_name: :updated_at,
min_id: widgets[6].id)
travel 61.seconds
# nothing else to process
- expect(poller).not_to receive(:process_batch)
+ expect(poller).not_to receive(:process_and_touch_info)
poller.process_updates
poller.process_updates
expect(MyProducer).to have_received(:poll_query).twice.
with(time_from: time_value(secs: -1),
time_to: time_value(secs: 120), # plus 122 seconds minus 2 seconds
column_name: :updated_at,
min_id: last_widget.id)
expect(Deimos.config.logger).to have_received(:info).
- with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (7 messages, 3 successful batches, 0 batches errored}')
+ with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (3 batches, 0 errored batches, 7 processed messages)')
end
describe 'errors' do
before(:each) do
poller.config.retries = 0
@@ -352,10 +365,10 @@
info = Deimos::PollInfo.last
expect(info.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 30))
expect(info.last_sent_id).to eq(widgets[6].id)
expect(Deimos.config.logger).to have_received(:info).
- with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (7 messages, 2 successful batches, 1 batches errored}')
+ with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (2 batches, 1 errored batches, 7 processed messages)')
end
end
end
end
end