Sha256: 758b63cc8e26d94fcc9d2c19389e91eadbc1e4d8eaf5e2a498030568225146f5
Contents?: true
Size: 1001 Bytes
Versions: 1
Compression:
Stored size: 1001 Bytes
Contents
class TestTopic attr_reader :topic_name def initialize(topic_name: Kafka.random_topic_name) @topic_name = topic_name end def consumer @consumer ||= Kafka.setup_kafka_consumer(topic_name) end def producer @producer ||= Kafka.setup_kafka_producer end def produce_one_message(key: "TEST KEY", payload: '{ "KEY": "TEST PAYLOAD" }', **kwargs) if kwargs.key?(:topic) raise "Do not specify the topic when producing with a TestTopic. Create a new TestTopic instead." end id = SecureRandom.uuid if kwargs.key?(:event_json) kwargs[:payload] = JSON.dump({id: id, event_json: JSON.dump(kwargs.delete(:event_json))}) end producer.produce( topic: topic_name, key: key, payload: payload, **kwargs ).wait id end def consume_one_message consumer.poll(Kafka::CONSUMER_POLL_TIMEOUT) end def config Kafka.test_config(topic_name) end def close @producer&.close @consumer&.close end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
nulogy_message_bus_consumer-2.0.1 | spec/support/test_topic.rb |