Sha256: 758b63cc8e26d94fcc9d2c19389e91eadbc1e4d8eaf5e2a498030568225146f5

Contents?: true

Size: 1001 Bytes

Versions: 1

Compression:

Stored size: 1001 Bytes

Contents

class TestTopic
  attr_reader :topic_name

  def initialize(topic_name: Kafka.random_topic_name)
    @topic_name = topic_name
  end

  def consumer
    @consumer ||= Kafka.setup_kafka_consumer(topic_name)
  end

  def producer
    @producer ||= Kafka.setup_kafka_producer
  end

  def produce_one_message(key: "TEST KEY", payload: '{ "KEY": "TEST PAYLOAD" }', **kwargs)
    if kwargs.key?(:topic)
      raise "Do not specify the topic when producing with a TestTopic. Create a new TestTopic instead."
    end

    id = SecureRandom.uuid
    if kwargs.key?(:event_json)
      kwargs[:payload] = JSON.dump({id: id, event_json: JSON.dump(kwargs.delete(:event_json))})
    end

    producer.produce(
      topic: topic_name,
      key: key,
      payload: payload,
      **kwargs
    ).wait

    id
  end

  def consume_one_message
    consumer.poll(Kafka::CONSUMER_POLL_TIMEOUT)
  end

  def config
    Kafka.test_config(topic_name)
  end

  def close
    @producer&.close
    @consumer&.close
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
nulogy_message_bus_consumer-2.0.1 spec/support/test_topic.rb