spec/utils/db_poller_spec.rb in deimos-ruby-1.17.1 vs spec/utils/db_poller_spec.rb in deimos-ruby-1.18.0

- old
+ new

@@ -1,16 +1,19 @@ # frozen_string_literal: true +require 'deimos/utils/db_poller' + # rubocop:disable Layout/LineLength -# @param seconds [Integer] +# @param secs [Integer] +# @param mins [Integer] # @return [Time] def time_value(secs: 0, mins: 0) Time.local(2015, 5, 5, 1, 0, 0) + (secs + (mins * 60)) end -each_db_config(Deimos::Utils::DbPoller) do +each_db_config(Deimos::Utils::DbPoller::Base) do before(:each) do Deimos::PollInfo.delete_all end @@ -34,11 +37,11 @@ stub_const('MyProducerWithID', producer_class) end it 'should raise an error if no pollers configured' do Deimos.configure {} - expect { described_class.start! }.to raise_error('No pollers configured!') + expect { Deimos::Utils::DbPoller.start! }.to raise_error('No pollers configured!') end it 'should start pollers as configured' do Deimos.configure do db_poller do @@ -47,27 +50,27 @@ db_poller do producer_class 'MyProducerWithID' end end - allow(Deimos::Utils::DbPoller).to receive(:new) + allow(Deimos::Utils::DbPoller::TimeBased).to receive(:new) signal_double = instance_double(Sigurd::SignalHandler, run!: nil) allow(Sigurd::SignalHandler).to receive(:new).and_return(signal_double) - described_class.start! - expect(Deimos::Utils::DbPoller).to have_received(:new).twice - expect(Deimos::Utils::DbPoller).to have_received(:new). + Deimos::Utils::DbPoller.start! + expect(Deimos::Utils::DbPoller::TimeBased).to have_received(:new).twice + expect(Deimos::Utils::DbPoller::TimeBased).to have_received(:new). with(Deimos.config.db_poller_objects[0]) - expect(Deimos::Utils::DbPoller).to have_received(:new). + expect(Deimos::Utils::DbPoller::TimeBased).to have_received(:new). with(Deimos.config.db_poller_objects[1]) end end describe 'pollers' do include_context 'with widgets' let(:poller) do - poller = described_class.new(config) + poller = Deimos::Utils::DbPoller.class_for_config(config.mode).new(config) allow(poller).to receive(:sleep) poller end let(:config) { Deimos.config.db_poller_objects.first.dup } @@ -158,10 +161,11 @@ expect(poller.should_run?).to eq(false) end describe '#process_batch' do let(:widgets) { (1..3).map { Widget.create!(test_id: 'some_id', some_int: 4) } } + let(:status) { Deimos::Utils::DbPoller::PollStatus.new(0, 0, 0) } before(:each) do allow(Deimos.config.tracer).to receive(:start).and_return('a span') allow(Deimos.config.tracer).to receive(:set_error) allow(Deimos.config.tracer).to receive(:finish) @@ -170,19 +174,22 @@ it 'should process the batch' do travel_to time_value widgets.last.update_attribute(:updated_at, time_value(mins: -30)) expect(MyProducer).to receive(:send_events).with(widgets) poller.retrieve_poll_info - poller.process_batch(widgets) + poller.process_and_touch_info(widgets, status) info = Deimos::PollInfo.last expect(info.last_sent.in_time_zone).to eq(time_value(mins: -30)) expect(info.last_sent_id).to eq(widgets.last.id) end it 'should create a span' do poller.retrieve_poll_info - poller.process_batch_with_span(widgets) + poller.process_batch_with_span(widgets, status) + expect(status.batches_errored).to eq(0) + expect(status.batches_processed).to eq(1) + expect(status.messages_processed).to eq(3) expect(Deimos.config.tracer).to have_received(:finish).with('a span') end it 'should retry on Kafka error' do called_once = false @@ -192,23 +199,29 @@ called_once = true raise Kafka::Error, 'OH NOES' end end poller.retrieve_poll_info - poller.process_batch_with_span(widgets) + poller.process_batch_with_span(widgets, status) expect(poller).to have_received(:sleep).once.with(0.5) expect(Deimos.config.tracer).to have_received(:finish).with('a span') + expect(status.batches_errored).to eq(0) + expect(status.batches_processed).to eq(1) + expect(status.messages_processed).to eq(3) end it 'should retry only once on other errors' do error = RuntimeError.new('OH NOES') allow(poller).to receive(:sleep) allow(poller).to receive(:process_batch).and_raise(error) poller.retrieve_poll_info - poller.process_batch_with_span(widgets) + poller.process_batch_with_span(widgets, status) expect(poller).to have_received(:sleep).once.with(0.5) expect(Deimos.config.tracer).to have_received(:set_error).with('a span', error) + expect(status.batches_errored).to eq(1) + expect(status.batches_processed).to eq(0) + expect(status.messages_processed).to eq(3) end end describe '#process_updates' do before(:each) do @@ -241,62 +254,62 @@ it 'should update the full table' do info = Deimos::PollInfo.last config.full_table = true expect(MyProducer).to receive(:poll_query).at_least(:once).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([old_widget, widgets[0], widgets[1]]).and_wrap_original do |m, *args| + expect(poller).to receive(:process_and_touch_info).ordered. + with([old_widget, widgets[0], widgets[1]], anything).and_wrap_original do |m, *args| m.call(*args) expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 32)) expect(info.last_sent_id).to eq(widgets[1].id) end - expect(poller).to receive(:process_batch).ordered. - with([widgets[2], widgets[3], widgets[4]]).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([widgets[5], widgets[6]]).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[2], widgets[3], widgets[4]], anything).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[5], widgets[6]], anything).and_call_original poller.process_updates # this is the updated_at of widgets[6] expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 37)) expect(info.last_sent_id).to eq(widgets[6].id) last_widget.update_attribute(:updated_at, time_value(mins: -250)) travel 61.seconds # should reprocess the table - expect(poller).to receive(:process_batch).ordered. - with([last_widget, old_widget, widgets[0]]).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([widgets[1], widgets[2], widgets[3]]).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([widgets[4], widgets[5], widgets[6]]).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([last_widget, old_widget, widgets[0]], anything).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[1], widgets[2], widgets[3]], anything).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[4], widgets[5], widgets[6]], anything).and_call_original poller.process_updates expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 37)) expect(info.last_sent_id).to eq(widgets[6].id) end it 'should send events across multiple batches' do allow(Deimos.config.logger).to receive(:info) allow(MyProducer).to receive(:poll_query).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([widgets[0], widgets[1], widgets[2]]).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([widgets[3], widgets[4], widgets[5]]).and_call_original - expect(poller).to receive(:process_batch).ordered. - with([widgets[6]]).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[0], widgets[1], widgets[2]], anything).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[3], widgets[4], widgets[5]], anything).and_call_original + expect(poller).to receive(:process_and_touch_info).ordered. + with([widgets[6]], anything).and_call_original poller.process_updates expect(MyProducer).to have_received(:poll_query). with(time_from: time_value(mins: -61), time_to: time_value(secs: -2), column_name: :updated_at, min_id: 0) travel 61.seconds # process the last widget which came in during the delay - expect(poller).to receive(:process_batch).with([last_widget]). + expect(poller).to receive(:process_and_touch_info).with([last_widget], anything). and_call_original poller.process_updates # widgets[6] updated_at value expect(MyProducer).to have_received(:poll_query). @@ -305,21 +318,21 @@ column_name: :updated_at, min_id: widgets[6].id) travel 61.seconds # nothing else to process - expect(poller).not_to receive(:process_batch) + expect(poller).not_to receive(:process_and_touch_info) poller.process_updates poller.process_updates expect(MyProducer).to have_received(:poll_query).twice. with(time_from: time_value(secs: -1), time_to: time_value(secs: 120), # plus 122 seconds minus 2 seconds column_name: :updated_at, min_id: last_widget.id) expect(Deimos.config.logger).to have_received(:info). - with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (7 messages, 3 successful batches, 0 batches errored}') + with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (3 batches, 0 errored batches, 7 processed messages)') end describe 'errors' do before(:each) do poller.config.retries = 0 @@ -352,10 +365,10 @@ info = Deimos::PollInfo.last expect(info.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 30)) expect(info.last_sent_id).to eq(widgets[6].id) expect(Deimos.config.logger).to have_received(:info). - with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (7 messages, 2 successful batches, 1 batches errored}') + with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (2 batches, 1 errored batches, 7 processed messages)') end end end end end