Sha256: 5171c295b48e1888e60e1feeb94ebdddfef6a511d70bca0e8b13d18528c844a8
Contents?: true
Size: 1.9 KB
Versions: 1
Compression:
Stored size: 1.9 KB
Contents
require "open3" require "poseidon" # Helper functions for integration testing with Kafka module KafkaHelper extend self ZOOKEEPER_ADDR = "localhost:2181" KAFKA_ADDR = "localhost:9092" def delete_topic(topic) log "*** Deleting Kafka topic: #{topic}" topic_command :delete, topic: topic end def create_topic(topic) log "*** Creating Kafka topic: #{topic}" required_topic_command :create, "replication-factor" => 1, "partitions" => 1, "topic" => topic end def list_topics topic_command(:list).split("\n") end def topic_exists?(topic) list_topics.include?(topic) end def fill_topic(topic, n = 100_000) fail ArgumentError, "min messages is 1000" if n < 1000 producer = Poseidon::Producer.new([KAFKA_ADDR], "my_test_producer", type: :sync) log "*** Filling topic with #{n} messages: #{topic}" (n / 1000).times do |i| messages = [] 1000.times do |j| n = (i * 1000 + j) messages << Poseidon::MessageToSend.new(topic, n.to_s) end producer.send_messages(messages) end ensure producer.close if producer end private def kafka_path File.expand_path("../../../../kafka", __FILE__) end def kafka_topics_bin_path "#{kafka_path}/bin/kafka-topics.sh" end def kafka_args(args = {}) { zookeeper: ZOOKEEPER_ADDR }.merge(args).map { |k, v| "--#{k} #{v}" }.join(" ") end def topic_command(command, args = {}) cmd = "#{kafka_topics_bin_path} --#{command} #{kafka_args(args)}" stdout_str, _stderr_str, status = Open3.capture3(cmd) return unless status.success? stdout_str end def required_topic_command(command, args = {}) result = topic_command(command, args) fail "Kafka command failed!" unless result true end def log(message) STDERR.puts(message) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
turbine-1.0.0.pre | lib/turbine/rspec/kafka_helper.rb |