Sha256: 20403f3ad906197d01a4526370019967e90db6e36ca980569aeffa60e6ba30cd
Contents?: true
Size: 1.85 KB
Versions: 1
Compression:
Stored size: 1.85 KB
Contents
# frozen_string_literal: true RSpec.shared_context "with sbmt karafka consumer" do subject(:consume_with_sbmt_karafka) do coordinator.increment consumer.on_consume end let(:coordinator) { instance = Karafka::Processing::Coordinator.new(test_topic, 0, instance_double(Karafka::TimeTrackers::Pause)) instance.instance_variable_set(:@seek_offset, -1) instance } let(:test_consumer_group) { Karafka::Routing::ConsumerGroup.new(:test_group) } let(:test_topic) { Karafka::Routing::Topic.new(:test_topic, test_consumer_group) } let(:kafka_client) { instance_double(Karafka::Connection::Client) } let(:null_deserializer) { Sbmt::KafkaConsumer::Serialization::NullDeserializer.new } let(:consumer) { build_consumer(described_class.new) } before { Sbmt::KafkaConsumer::ClientConfigurer.configure! allow(kafka_client).to receive(:assignment_lost?).and_return(false) allow(kafka_client).to receive(:mark_as_consumed!).and_return(true) } def publish_to_sbmt_karafka(raw_payload, opts = {}) message = Karafka::Messages::Message.new(raw_payload, Karafka::Messages::Metadata.new(metadata_defaults.merge(opts))) consumer.messages = Karafka::Messages::Messages.new( [message], Karafka::Messages::BatchMetadata.new( topic: test_topic.name, partition: 0, processed_at: Time.zone.now, created_at: Time.zone.now ) ) end # @return [Hash] message default options def metadata_defaults { deserializer: null_deserializer, headers: {}, key: nil, offset: 0, partition: 0, received_at: Time.current, topic: test_topic.name } end def build_consumer(instance) instance.coordinator = coordinator instance.client = kafka_client instance.singleton_class.include Karafka::Processing::Strategies::Default instance end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sbmt-kafka_consumer-2.0.0 | lib/sbmt/kafka_consumer/testing/shared_contexts/with_sbmt_karafka_consumer.rb |