lib/deimos/test_helpers.rb in deimos-ruby-1.0.0 vs lib/deimos/test_helpers.rb in deimos-ruby-1.1.0.pre.beta1

- old
+ new

@@ -89,11 +89,11 @@ block.call end end end - # Stub all already-loaded producers and consumers fir unit testing purposes. + # Stub all already-loaded producers and consumers for unit testing purposes. def stub_producers_and_consumers! Deimos::TestHelpers.sent_messages.clear allow(Deimos::Producer).to receive(:produce_batch) do |_, batch| Deimos::TestHelpers.sent_messages.concat(batch.map(&:to_h)) @@ -104,14 +104,21 @@ stub_producer(klass) end Deimos::Consumer.descendants.each do |klass| + # TODO: remove this when ActiveRecordConsumer uses batching next if klass == Deimos::ActiveRecordConsumer # "abstract" class stub_consumer(klass) end + + Deimos::BatchConsumer.descendants.each do |klass| + next if klass == Deimos::ActiveRecordConsumer # "abstract" class + + stub_batch_consumer(klass) + end end # Stub a given producer class. # @param klass [Class < Deimos::Producer] def stub_producer(klass) @@ -124,22 +131,26 @@ end # Stub a given consumer class. # @param klass [Class < Deimos::Consumer] def stub_consumer(klass) - allow(klass).to receive(:decoder) do - create_decoder(klass.config[:schema], klass.config[:namespace]) - end + _stub_base_consumer(klass) klass.class_eval do alias_method(:old_consume, :consume) unless self.instance_methods.include?(:old_consume) end allow_any_instance_of(klass).to receive(:consume) do |instance, payload, metadata| metadata[:key] = klass.new.decode_key(metadata[:key]) instance.old_consume(payload, metadata) end end + # Stub a given batch consumer class. + # @param klass [Class < Deimos::BatchConsumer] + def stub_batch_consumer(klass) + _stub_base_consumer(klass) + end + # get the difference of 2 hashes. # @param hash1 [Hash] # @param hash2 [Hash] def _hash_diff(hash1, hash2) if hash1.nil? || !hash1.is_a?(Hash) @@ -312,10 +323,106 @@ listener_metadata: { topic: 'my-topic' } ).send(:process_message, payload) }.to raise_error(Avro::SchemaValidator::ValidationError) end + # Test that a given handler will consume a given batch payload correctly, + # i.e. that the Avro schema is correct. If + # a block is given, that block will be executed when `consume` is called. + # Otherwise it will just confirm that `consume` is called at all. + # @param handler_class_or_topic [Class|String] Class which inherits from + # Deimos::Consumer or the topic as a string + # @param payloads [Array<Hash>] the payload to consume + def test_consume_batch(handler_class_or_topic, + payloads, + keys: [], + partition_keys: [], + call_original: false, + skip_expectation: false, + &block) + if call_original && block_given? + raise 'Cannot have both call_original and be given a block!' + end + + topic_name = 'my-topic' + handler_class = if handler_class_or_topic.is_a?(String) + _get_handler_class_from_topic(handler_class_or_topic) + else + handler_class_or_topic + end + handler = handler_class.new + allow(handler_class).to receive(:new).and_return(handler) + listener = double('listener', + handler_class: handler_class, + encoding: nil) + batch_messages = payloads.zip(keys, partition_keys).map do |payload, key, partition_key| + key ||= _key_from_consumer(handler_class) + + double('message', + 'key' => key, + 'partition_key' => partition_key, + 'partition' => 1, + 'offset' => 1, + 'value' => payload) + end + batch = double('fetched_batch', + 'messages' => batch_messages, + 'topic' => topic_name, + 'partition' => 1, + 'offset_lag' => 0) + unless skip_expectation + expectation = expect(handler).to receive(:consume_batch). + with(payloads, anything, &block) + expectation.and_call_original if call_original + end + action = Phobos::Actions::ProcessBatchInline.new( + listener: listener, + batch: batch, + metadata: { topic: topic_name } + ) + allow(action).to receive(:backoff_interval).and_return(0) + allow(action).to receive(:handle_error) { |e| raise e } + action.send(:execute) + end + + # Check to see that a given message will fail due to Avro errors. + # @param handler_class [Class] + # @param payloads [Array<Hash>] + def test_consume_batch_invalid_message(handler_class, payloads) + topic_name = 'my-topic' + handler = handler_class.new + allow(handler_class).to receive(:new).and_return(handler) + listener = double('listener', + handler_class: handler_class, + encoding: nil) + batch_messages = payloads.map do |payload| + key ||= _key_from_consumer(handler_class) + + double('message', + 'key' => key, + 'partition' => 1, + 'offset' => 1, + 'value' => payload) + end + batch = double('fetched_batch', + 'messages' => batch_messages, + 'topic' => topic_name, + 'partition' => 1, + 'offset_lag' => 0) + + action = Phobos::Actions::ProcessBatchInline.new( + listener: listener, + batch: batch, + metadata: { topic: topic_name } + ) + allow(action).to receive(:backoff_interval).and_return(0) + allow(action).to receive(:handle_error) { |e| raise e } + + expect { action.send(:execute) }. + to raise_error(Avro::SchemaValidator::ValidationError) + end + # @param schema1 [String|Hash] a file path, JSON string, or # hash representing a schema. # @param schema2 [String|Hash] a file path, JSON string, or # hash representing a schema. # @return [Boolean] true if the schemas are compatible, false otherwise. @@ -349,8 +456,16 @@ listeners = Phobos.config['listeners'] handler = listeners.find { |l| l.topic == topic } raise "No consumer found in Phobos configuration for topic #{topic}!" if handler.nil? handler.handler.constantize + end + + # Stub shared methods between consumers/batch consumers + # @param [Class < Deimos::BaseConsumer] klass Consumer class to stub + def _stub_base_consumer(klass) + allow(klass).to receive(:decoder) do + create_decoder(klass.config[:schema], klass.config[:namespace]) + end end end end