lib/fluq/input/kafka.rb in fluq-kafka-0.8.0 vs lib/fluq/input/kafka.rb in fluq-kafka-0.8.1

- old
+ new

@@ -12,10 +12,12 @@ # Default: 1048576 (1MB) # @option options [Integer] :max_wait_ms how long to block until the server sends us data. # Default: 100 (100ms) # @option options [Integer] :min_bytes smallest amount of data the server should send us. # Default: 0 (send us data as soon as it is ready) + # @option options [Class] :consumer_class the consumer class to use. + # Ddefault: FluQ::Kafka::Consumer # # @raises [ArgumentError] when no topic provided # # @example # @@ -62,11 +64,11 @@ end protected def consumer - @consumer ||= ::Poseidon::ConsumerGroup.new config[:group], config[:brokers], config[:zookeepers], config[:topic], + @consumer ||= config[:consumer_class].new config[:group], config[:brokers], config[:zookeepers], config[:topic], min_bytes: config[:min_bytes], max_bytes: config[:max_bytes], max_wait_ms: config[:max_wait_ms] end @@ -79,15 +81,19 @@ group: "fluq", min_bytes: 0, max_bytes: (1024 * 1024), max_wait_ms: 100, brokers: ["localhost:9092"], - zookeepers: ["localhost:2181"] + zookeepers: ["localhost:2181"], + consumer_class: ::FluQ::Kafka::Consumer + end def before_terminate @consumer.close if @consumer rescue ThreadError + ensure + @consumer = nil end end