Sha256: e0dd7427f1d39806aaf98f781cc75b71ed9bb30184cdc3e8090f4119c386b7ef
Contents?: true
Size: 1.11 KB
Versions: 2
Compression:
Stored size: 1.11 KB
Contents
RSpec.describe NulogyMessageBusConsumer::Steps::ConnectToMessageBus do let(:logger) { NulogyMessageBusConsumer::NullLogger.new } let(:topic) { TestTopic.new } let(:tap) { MiddlewareTap.new } let(:message_handler_spy) { double } let(:pipeline) do NulogyMessageBusConsumer::Pipeline.new([ NulogyMessageBusConsumer::Steps::ConnectToMessageBus.new(topic.config, logger, kafka_consumer: topic.consumer), NulogyMessageBusConsumer::Steps::StreamMessagesUntilNoneAreLeft.new(logger, Kafka::CONSUMER_POLL_TIMEOUT), message_handler_spy ]) end after { topic.close } it "receives messages" do create_topic(topic.topic_name) expect(message_handler_spy).to receive(:call) do |message:, **_kargs| expect(message).to have_attributes(event_data: {data: "Some Payload"}) :success end topic.produce_one_message( key: "Some Key", payload: message_payload(data: "Some Payload") ) pipeline.invoke end def message_payload(**payload) JSON.dump( id: SecureRandom.uuid, created_at: 1_000, event_json: JSON.dump(payload) ) end end
Version data entries
2 entries across 2 versions & 1 rubygems