Sha256: 1db41e4660ea0d8979e557cfc0a35151ceafc7689539990bdb87e54a60214638

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

RSpec.describe "End to end" do
  let(:logger) { NulogyMessageBusConsumer::NullLogger.new }
  let(:topic) { TestTopic.new }
  let(:tap) { MiddlewareTap.new }
  let(:message_handler_spy) { double }
  subject(:pipeline) do
    pipeline = NulogyMessageBusConsumer.recommended_consumer_pipeline(config: topic.config)
    pipeline.insert(tap, after: NulogyMessageBusConsumer::Steps::ConnectToMessageBus)
    pipeline.append(message_handler_spy)
    pipeline
  end

  after { topic.close }

  it "receives messages using the full pipeline" do
    create_topic(topic.topic_name)

    called = false
    expect(message_handler_spy).to receive(:call) do |message:, **_kargs|
      expect(message).to have_attributes(event_data: {data: "Some Payload"})
      called = true
      :success
    end

    pipeline_thread = start(pipeline, tap)

    topic.produce_one_message(
      key: "Some Key",
      payload: message_payload(data: "Some Payload")
    )

    NulogyMessageBusConsumer::KafkaUtils.wait_for { called }
    Thread.kill(pipeline_thread)
  end

  def start(pipeline, tap)
    thr = Thread.new { pipeline.invoke }
    wait_for_partition_assignment(tap)
    thr
  end

  def wait_for_partition_assignment(tap)
    NulogyMessageBusConsumer::KafkaUtils.wait_for { tap.arguments.fetch(:kafka_consumer, nil) }
    NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(tap.arguments[:kafka_consumer])
  end

  def message_payload(**payload)
    JSON.dump(
      id: SecureRandom.uuid,
      created_at: 1_000,
      event_json: JSON.dump(payload)
    )
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 spec/integration/nulogy_message_bus_consumer/end_to_end_spec.rb