Sha256: 24f8eb4576d79d3ca955cb23177dfdc7868201e94bbfbe6490bfbc91a139cdd4

Contents?: true

Size: 1.41 KB

Versions: 1

Compression:

Stored size: 1.41 KB

Contents

require "./lib/rdkafka"

task :create_topics do
  puts "Creating test topics"
  kafka_topics = if ENV['TRAVIS']
                   'kafka/bin/kafka-topics.sh'
                 else
                   'kafka-topics'
                 end
  `#{kafka_topics} --create --topic=consume_test_topic --zookeeper=127.0.0.1:2181 --partitions=3 --replication-factor=1`
  `#{kafka_topics} --create --topic=produce_test_topic --zookeeper=127.0.0.1:2181 --partitions=3 --replication-factor=1`
  `#{kafka_topics} --create --topic=rake_test_topic --zookeeper=127.0.0.1:2181 --partitions=3 --replication-factor=1`
end

task :produce_messages do
  config = {:"bootstrap.servers" => "localhost:9092"}
  if ENV["DEBUG"]
    config[:debug] = "broker,topic,msg"
  end
  producer = Rdkafka::Config.new(config).producer
  100.times do |i|
    puts "Producing message #{i}"
    producer.produce(
        topic:   "rake_test_topic",
        payload: "Payload #{i} from Rake",
        key:     "Key #{i} from Rake"
    ).wait
  end
end

task :consume_messages do
  config = {
    :"bootstrap.servers" => "localhost:9092",
    :"group.id" => "rake_test",
    :"enable.partition.eof" => false,
    :"auto.offset.reset" => "earliest"
  }
  if ENV["DEBUG"]
    config[:debug] = "cgrp,topic,fetch"
  end
  consumer = Rdkafka::Config.new(config).consumer
  consumer.subscribe("rake_test_topic")
  consumer.each do |message|
    puts "Message received: #{message}"
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rdkafka-0.3.0 Rakefile