spec/utils/db_poller_spec.rb in deimos-ruby-1.19.7 vs spec/utils/db_poller_spec.rb in deimos-ruby-1.20.0
- old
+ new
@@ -66,11 +66,11 @@
describe 'pollers' do
include_context 'with widgets'
let(:poller) do
- poller = Deimos::Utils::DbPoller.class_for_config(config.mode).new(config)
+ poller = Deimos::Utils::DbPoller.class_for_config(config).new(config)
allow(poller).to receive(:sleep)
poller
end
let(:config) { Deimos.config.db_poller_objects.first.dup }
@@ -383,9 +383,107 @@
expect(info.last_sent_id).to eq(widgets[6].id)
expect(Deimos.config.logger).to have_received(:info).
with('Poll my-topic-with-id complete at 2015-05-05 00:59:58 -0400 (2 batches, 1 errored batches, 7 processed messages)')
end
end
+ end
+ end
+
+ describe 'multi_producer_pollers' do
+ include_context 'with widgets'
+
+ let(:poller) do
+ poller = Deimos::Utils::DbPoller.class_for_config(config).new(config)
+ allow(poller).to receive(:sleep)
+ poller
+ end
+
+ let(:config) { Deimos.config.db_poller_objects.first.dup }
+
+ before(:each) do
+ Widget.delete_all
+ producer_class = Class.new(Deimos::ActiveRecordProducer) do
+ schema 'MySchemaWithId'
+ namespace 'com.my-namespace'
+ topic 'my-topic-with-id'
+ key_config none: true
+ record_class Widget
+
+ # :nodoc:
+ def self.generate_payload(attrs, widget)
+ super.merge(message_id: widget.generated_id)
+ end
+ end
+ stub_const('ProducerOne', producer_class)
+
+ producer_class = Class.new(Deimos::ActiveRecordProducer) do
+ schema 'MySchemaWithId'
+ namespace 'com.my-namespace'
+ topic 'my-topic-with-id'
+ key_config none: true
+ record_class Widget
+
+ # :nodoc:
+ def self.generate_payload(attrs, widget)
+ super.merge(message_id: widget.generated_id)
+ end
+ end
+ stub_const('ProducerTwo', producer_class)
+
+ poller_class = Class.new(Deimos::Utils::DbPoller::StateBased) do
+ def self.producers
+ [ProducerOne, ProducerTwo]
+ end
+
+ def self.poll_query(*)
+ Widget.where(publish_status: nil)
+ end
+ end
+ stub_const('Deimos::Utils::DbPoller::MultiProducerPoller', poller_class)
+ end
+
+ it 'should publish to two different kafka topics from two producers' do
+ Deimos.configure do
+ db_poller do
+ poller_class 'Deimos::Utils::DbPoller::MultiProducerPoller'
+ mode :state_based
+ state_column :publish_status
+ publish_timestamp_column :published_at
+ published_state 'PUBLISHED'
+ failed_state 'PUBLISH_FAILED'
+ run_every 1.minute
+ end
+ end
+
+ widgets = (1..3).map do |i|
+ Widget.create!(test_id: 'some_id', some_int: i,
+ updated_at: time_value(mins: -61, secs: 30 + i),
+ publish_status: nil, published_at: nil)
+ end
+ poller.retrieve_poll_info
+ allow(Deimos::Utils::DbPoller::MultiProducerPoller).to receive(:poll_query).and_call_original
+ allow(ProducerOne).to receive(:send_events)
+ allow(ProducerTwo).to receive(:send_events)
+ expect(Deimos::Utils::DbPoller::MultiProducerPoller).to receive(:poll_query).at_least(:once)
+ poller.process_updates
+
+ expect(ProducerOne).to have_received(:send_events).with(widgets)
+ expect(ProducerTwo).to have_received(:send_events).with(widgets)
+ expect(widgets.map(&:reload).map(&:publish_status)).to eq(%w(PUBLISHED PUBLISHED PUBLISHED))
+ end
+
+ it 'should raise an error if producer_class and poller_class are both not configured' do
+ Deimos.configure do
+ db_poller do
+ mode :state_based
+ state_column :publish_status
+ publish_timestamp_column :published_at
+ published_state 'PUBLISHED'
+ failed_state 'PUBLISH_FAILED'
+ run_every 1.minute
+ end
+ end
+ expect { described_class.new(config) }.to raise_error('No producers have been set for this DB poller!')
end
end
end
# rubocop:enable Layout/LineLength