spec/utils/db_poller_spec.rb in deimos-ruby-1.24.2 vs spec/utils/db_poller_spec.rb in deimos-ruby-2.0.0.pre.alpha1
- old
+ new
@@ -18,25 +18,31 @@
end
describe '#start!' do
before(:each) do
- producer_class = Class.new(Deimos::Producer) do
- schema 'MySchema'
- namespace 'com.my-namespace'
- topic 'my-topic'
- key_config field: 'test_id'
- end
+ producer_class = Class.new(Deimos::Producer)
stub_const('MyProducer', producer_class)
- producer_class = Class.new(Deimos::Producer) do
- schema 'MySchemaWithId'
- namespace 'com.my-namespace'
- topic 'my-topic'
- key_config plain: true
- end
+ producer_class = Class.new(Deimos::Producer)
stub_const('MyProducerWithID', producer_class)
+
+ Karafka::App.routes.redraw do
+ topic 'my-topic' do
+ schema 'MySchema'
+ namespace 'com.my-namespace'
+ key_config field: 'test_id'
+ producer_class MyProducer
+ end
+ topic 'my-topic-with-id' do
+ schema 'MySchemaWithId'
+ namespace 'com.my-namespace'
+ key_config plain: true
+ producer_class MyProducerWithID
+ end
+ end
+
end
it 'should raise an error if no pollers configured' do
Deimos.configure {}
expect { Deimos::Utils::DbPoller.start! }.to raise_error('No pollers configured!')
@@ -74,25 +80,30 @@
end
let(:config) { Deimos.config.db_poller_objects.first.dup }
before(:each) do
- Widget.delete_all
producer_class = Class.new(Deimos::ActiveRecordProducer) do
- schema 'MySchemaWithId'
- namespace 'com.my-namespace'
- topic 'my-topic-with-id'
- key_config none: true
record_class Widget
# :nodoc:
def self.generate_payload(attrs, widget)
super.merge(message_id: widget.generated_id)
end
end
stub_const('MyProducer', producer_class)
+ Widget.delete_all
+ Karafka::App.routes.redraw do
+ topic 'my-topic-with-id' do
+ schema 'MySchemaWithId'
+ namespace 'com.my-namespace'
+ key_config none: true
+ producer_class MyProducer
+ end
+ end
+
Deimos.configure do
db_poller do
producer_class 'MyProducer'
run_every 1.minute
end
@@ -193,11 +204,13 @@
context 'with skip_too_large_messages on' do
before(:each) { config.skip_too_large_messages = true }
it 'should skip and move on' do
- error = Kafka::MessageSizeTooLarge.new('OH NOES')
+ rdkafka_error = instance_double(Rdkafka::RdkafkaError, code: :msg_size_too_large)
+ error = WaterDrop::Errors::ProduceManyError.new(nil, nil)
+ allow(error).to receive(:cause).and_return(rdkafka_error)
allow(poller).to receive(:sleep)
allow(poller).to receive(:process_batch) do
raise error
end
poller.retrieve_poll_info
@@ -329,11 +342,11 @@
expect(info.reload.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 37))
expect(info.last_sent_id).to eq(widgets[6].id)
end
it 'should send events across multiple batches' do
- allow(Deimos.config.logger).to receive(:info)
+ allow(Deimos::Logging).to receive(:log_info)
allow(MyProducer).to receive(:poll_query).and_call_original
expect(poller).to receive(:process_and_touch_info).ordered.
with([widgets[0], widgets[1], widgets[2]], anything).and_call_original
expect(poller).to receive(:process_and_touch_info).ordered.
with([widgets[3], widgets[4], widgets[5]], anything).and_call_original
@@ -374,11 +387,11 @@
expect(MyProducer).to have_received(:poll_query).
with(time_from: time_value(secs: 122),
time_to: time_value(secs: 120), # yes this is weird but it's because of travel_to
column_name: :updated_at,
min_id: last_widget.id)
- expect(Deimos.config.logger).to have_received(:info).
+ expect(Deimos::Logging).to have_received(:log_info).
with('Poll MyProducer: ["my-topic-with-id"] complete at 2015-05-05 00:59:58 -0400 (3 batches, 0 errored batches, 7 processed messages)')
end
it 'should update PollInfo timestamp after processing' do
poll_before = Deimos::PollInfo.last
@@ -396,11 +409,11 @@
end
describe 'errors' do
before(:each) do
poller.config.retries = 0
- allow(Deimos.config.logger).to receive(:info)
+ allow(Deimos::Logging).to receive(:log_info)
end
after(:each) do
poller.config.retries = 1
end
@@ -426,11 +439,11 @@
min_id: 0)
info = Deimos::PollInfo.last
expect(info.last_sent.in_time_zone).to eq(time_value(mins: -61, secs: 30))
expect(info.last_sent_id).to eq(widgets[6].id)
- expect(Deimos.config.logger).to have_received(:info).
+ expect(Deimos::Logging).to have_received(:log_info).
with('Poll MyProducer: ["my-topic-with-id"] complete at 2015-05-05 00:59:58 -0400 (2 batches, 1 errored batches, 7 processed messages)')
end
end
end
end
@@ -447,36 +460,34 @@
let(:config) { Deimos.config.db_poller_objects.first.dup }
before(:each) do
Widget.delete_all
producer_class = Class.new(Deimos::ActiveRecordProducer) do
- schema 'MySchemaWithId'
- namespace 'com.my-namespace'
- topic 'my-topic-with-id'
- key_config none: true
record_class Widget
# :nodoc:
def self.generate_payload(attrs, widget)
super.merge(message_id: widget.generated_id)
end
end
stub_const('ProducerOne', producer_class)
+ stub_const('ProducerTwo', producer_class)
- producer_class = Class.new(Deimos::ActiveRecordProducer) do
- schema 'MySchemaWithId'
- namespace 'com.my-namespace'
- topic 'my-topic-with-id'
- key_config none: true
- record_class Widget
-
- # :nodoc:
- def self.generate_payload(attrs, widget)
- super.merge(message_id: widget.generated_id)
+ Karafka::App.routes.redraw do
+ topic 'my-topic-with-id' do
+ schema 'MySchemaWithId'
+ namespace 'com.my-namespace'
+ key_config none: true
+ producer_class ProducerOne
end
+ topic 'my-topic-with-id2' do
+ schema 'MySchemaWithId'
+ namespace 'com.my-namespace'
+ key_config none: true
+ producer_class ProducerTwo
+ end
end
- stub_const('ProducerTwo', producer_class)
poller_class = Class.new(Deimos::Utils::DbPoller::StateBased) do
def self.producers
[ProducerOne, ProducerTwo]
end
@@ -511,11 +522,10 @@
allow(ProducerOne).to receive(:send_events)
allow(ProducerTwo).to receive(:send_events)
expect(Deimos::Utils::DbPoller::MultiProducerPoller).to receive(:poll_query).at_least(:once)
poller.process_updates
- expect(ProducerOne).to have_received(:send_events).with(widgets)
- expect(ProducerTwo).to have_received(:send_events).with(widgets)
+ expect(ProducerOne).to have_received(:send_events).twice.with(widgets)
expect(widgets.map(&:reload).map(&:publish_status)).to eq(%w(PUBLISHED PUBLISHED PUBLISHED))
end
it 'should raise an error if producer_class and poller_class are both not configured' do
Deimos.configure do