Sha256: 1db41e4660ea0d8979e557cfc0a35151ceafc7689539990bdb87e54a60214638
Contents?: true
Size: 1.52 KB
Versions: 1
Compression:
Stored size: 1.52 KB
Contents
RSpec.describe "End to end" do let(:logger) { NulogyMessageBusConsumer::NullLogger.new } let(:topic) { TestTopic.new } let(:tap) { MiddlewareTap.new } let(:message_handler_spy) { double } subject(:pipeline) do pipeline = NulogyMessageBusConsumer.recommended_consumer_pipeline(config: topic.config) pipeline.insert(tap, after: NulogyMessageBusConsumer::Steps::ConnectToMessageBus) pipeline.append(message_handler_spy) pipeline end after { topic.close } it "receives messages using the full pipeline" do create_topic(topic.topic_name) called = false expect(message_handler_spy).to receive(:call) do |message:, **_kargs| expect(message).to have_attributes(event_data: {data: "Some Payload"}) called = true :success end pipeline_thread = start(pipeline, tap) topic.produce_one_message( key: "Some Key", payload: message_payload(data: "Some Payload") ) NulogyMessageBusConsumer::KafkaUtils.wait_for { called } Thread.kill(pipeline_thread) end def start(pipeline, tap) thr = Thread.new { pipeline.invoke } wait_for_partition_assignment(tap) thr end def wait_for_partition_assignment(tap) NulogyMessageBusConsumer::KafkaUtils.wait_for { tap.arguments.fetch(:kafka_consumer, nil) } NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(tap.arguments[:kafka_consumer]) end def message_payload(**payload) JSON.dump( id: SecureRandom.uuid, created_at: 1_000, event_json: JSON.dump(payload) ) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nulogy_message_bus_consumer-2.0.1 | spec/integration/nulogy_message_bus_consumer/end_to_end_spec.rb |