lib/deimos/test_helpers.rb in deimos-ruby-1.0.0 vs lib/deimos/test_helpers.rb in deimos-ruby-1.1.0.pre.beta1
- old
+ new
@@ -89,11 +89,11 @@
block.call
end
end
end
- # Stub all already-loaded producers and consumers fir unit testing purposes.
+ # Stub all already-loaded producers and consumers for unit testing purposes.
def stub_producers_and_consumers!
Deimos::TestHelpers.sent_messages.clear
allow(Deimos::Producer).to receive(:produce_batch) do |_, batch|
Deimos::TestHelpers.sent_messages.concat(batch.map(&:to_h))
@@ -104,14 +104,21 @@
stub_producer(klass)
end
Deimos::Consumer.descendants.each do |klass|
+ # TODO: remove this when ActiveRecordConsumer uses batching
next if klass == Deimos::ActiveRecordConsumer # "abstract" class
stub_consumer(klass)
end
+
+ Deimos::BatchConsumer.descendants.each do |klass|
+ next if klass == Deimos::ActiveRecordConsumer # "abstract" class
+
+ stub_batch_consumer(klass)
+ end
end
# Stub a given producer class.
# @param klass [Class < Deimos::Producer]
def stub_producer(klass)
@@ -124,22 +131,26 @@
end
# Stub a given consumer class.
# @param klass [Class < Deimos::Consumer]
def stub_consumer(klass)
- allow(klass).to receive(:decoder) do
- create_decoder(klass.config[:schema], klass.config[:namespace])
- end
+ _stub_base_consumer(klass)
klass.class_eval do
alias_method(:old_consume, :consume) unless self.instance_methods.include?(:old_consume)
end
allow_any_instance_of(klass).to receive(:consume) do |instance, payload, metadata|
metadata[:key] = klass.new.decode_key(metadata[:key])
instance.old_consume(payload, metadata)
end
end
+ # Stub a given batch consumer class.
+ # @param klass [Class < Deimos::BatchConsumer]
+ def stub_batch_consumer(klass)
+ _stub_base_consumer(klass)
+ end
+
# get the difference of 2 hashes.
# @param hash1 [Hash]
# @param hash2 [Hash]
def _hash_diff(hash1, hash2)
if hash1.nil? || !hash1.is_a?(Hash)
@@ -312,10 +323,106 @@
listener_metadata: { topic: 'my-topic' }
).send(:process_message, payload)
}.to raise_error(Avro::SchemaValidator::ValidationError)
end
+ # Test that a given handler will consume a given batch payload correctly,
+ # i.e. that the Avro schema is correct. If
+ # a block is given, that block will be executed when `consume` is called.
+ # Otherwise it will just confirm that `consume` is called at all.
+ # @param handler_class_or_topic [Class|String] Class which inherits from
+ # Deimos::Consumer or the topic as a string
+ # @param payloads [Array<Hash>] the payload to consume
+ def test_consume_batch(handler_class_or_topic,
+ payloads,
+ keys: [],
+ partition_keys: [],
+ call_original: false,
+ skip_expectation: false,
+ &block)
+ if call_original && block_given?
+ raise 'Cannot have both call_original and be given a block!'
+ end
+
+ topic_name = 'my-topic'
+ handler_class = if handler_class_or_topic.is_a?(String)
+ _get_handler_class_from_topic(handler_class_or_topic)
+ else
+ handler_class_or_topic
+ end
+ handler = handler_class.new
+ allow(handler_class).to receive(:new).and_return(handler)
+ listener = double('listener',
+ handler_class: handler_class,
+ encoding: nil)
+ batch_messages = payloads.zip(keys, partition_keys).map do |payload, key, partition_key|
+ key ||= _key_from_consumer(handler_class)
+
+ double('message',
+ 'key' => key,
+ 'partition_key' => partition_key,
+ 'partition' => 1,
+ 'offset' => 1,
+ 'value' => payload)
+ end
+ batch = double('fetched_batch',
+ 'messages' => batch_messages,
+ 'topic' => topic_name,
+ 'partition' => 1,
+ 'offset_lag' => 0)
+ unless skip_expectation
+ expectation = expect(handler).to receive(:consume_batch).
+ with(payloads, anything, &block)
+ expectation.and_call_original if call_original
+ end
+ action = Phobos::Actions::ProcessBatchInline.new(
+ listener: listener,
+ batch: batch,
+ metadata: { topic: topic_name }
+ )
+ allow(action).to receive(:backoff_interval).and_return(0)
+ allow(action).to receive(:handle_error) { |e| raise e }
+ action.send(:execute)
+ end
+
+ # Check to see that a given message will fail due to Avro errors.
+ # @param handler_class [Class]
+ # @param payloads [Array<Hash>]
+ def test_consume_batch_invalid_message(handler_class, payloads)
+ topic_name = 'my-topic'
+ handler = handler_class.new
+ allow(handler_class).to receive(:new).and_return(handler)
+ listener = double('listener',
+ handler_class: handler_class,
+ encoding: nil)
+ batch_messages = payloads.map do |payload|
+ key ||= _key_from_consumer(handler_class)
+
+ double('message',
+ 'key' => key,
+ 'partition' => 1,
+ 'offset' => 1,
+ 'value' => payload)
+ end
+ batch = double('fetched_batch',
+ 'messages' => batch_messages,
+ 'topic' => topic_name,
+ 'partition' => 1,
+ 'offset_lag' => 0)
+
+ action = Phobos::Actions::ProcessBatchInline.new(
+ listener: listener,
+ batch: batch,
+ metadata: { topic: topic_name }
+ )
+ allow(action).to receive(:backoff_interval).and_return(0)
+ allow(action).to receive(:handle_error) { |e| raise e }
+
+ expect { action.send(:execute) }.
+ to raise_error(Avro::SchemaValidator::ValidationError)
+ end
+
# @param schema1 [String|Hash] a file path, JSON string, or
# hash representing a schema.
# @param schema2 [String|Hash] a file path, JSON string, or
# hash representing a schema.
# @return [Boolean] true if the schemas are compatible, false otherwise.
@@ -349,8 +456,16 @@
listeners = Phobos.config['listeners']
handler = listeners.find { |l| l.topic == topic }
raise "No consumer found in Phobos configuration for topic #{topic}!" if handler.nil?
handler.handler.constantize
+ end
+
+ # Stub shared methods between consumers/batch consumers
+ # @param [Class < Deimos::BaseConsumer] klass Consumer class to stub
+ def _stub_base_consumer(klass)
+ allow(klass).to receive(:decoder) do
+ create_decoder(klass.config[:schema], klass.config[:namespace])
+ end
end
end
end