Sha256: e0dd7427f1d39806aaf98f781cc75b71ed9bb30184cdc3e8090f4119c386b7ef

Contents?: true

Size: 1.11 KB

Versions: 2

Compression:

Stored size: 1.11 KB

Contents

RSpec.describe NulogyMessageBusConsumer::Steps::ConnectToMessageBus do
  let(:logger) { NulogyMessageBusConsumer::NullLogger.new }
  let(:topic) { TestTopic.new }
  let(:tap) { MiddlewareTap.new }
  let(:message_handler_spy) { double }
  let(:pipeline) do
    NulogyMessageBusConsumer::Pipeline.new([
      NulogyMessageBusConsumer::Steps::ConnectToMessageBus.new(topic.config, logger, kafka_consumer: topic.consumer),
      NulogyMessageBusConsumer::Steps::StreamMessagesUntilNoneAreLeft.new(logger, Kafka::CONSUMER_POLL_TIMEOUT),
      message_handler_spy
    ])
  end

  after { topic.close }

  it "receives messages" do
    create_topic(topic.topic_name)

    expect(message_handler_spy).to receive(:call) do |message:, **_kargs|
      expect(message).to have_attributes(event_data: {data: "Some Payload"})
      :success
    end

    topic.produce_one_message(
      key: "Some Key",
      payload: message_payload(data: "Some Payload")
    )

    pipeline.invoke
  end

  def message_payload(**payload)
    JSON.dump(
      id: SecureRandom.uuid,
      created_at: 1_000,
      event_json: JSON.dump(payload)
    )
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-3.0.0 spec/integration/nulogy_message_bus_consumer/steps/connect_to_message_bus_spec.rb
nulogy_message_bus_consumer-2.0.1 spec/integration/nulogy_message_bus_consumer/steps/connect_to_message_bus_spec.rb