lib/fluq/input/kafka.rb in fluq-kafka-0.8.0 vs lib/fluq/input/kafka.rb in fluq-kafka-0.8.1
- old
+ new
@@ -12,10 +12,12 @@
# Default: 1048576 (1MB)
# @option options [Integer] :max_wait_ms how long to block until the server sends us data.
# Default: 100 (100ms)
# @option options [Integer] :min_bytes smallest amount of data the server should send us.
# Default: 0 (send us data as soon as it is ready)
+ # @option options [Class] :consumer_class the consumer class to use.
+ # Ddefault: FluQ::Kafka::Consumer
#
# @raises [ArgumentError] when no topic provided
#
# @example
#
@@ -62,11 +64,11 @@
end
protected
def consumer
- @consumer ||= ::Poseidon::ConsumerGroup.new config[:group], config[:brokers], config[:zookeepers], config[:topic],
+ @consumer ||= config[:consumer_class].new config[:group], config[:brokers], config[:zookeepers], config[:topic],
min_bytes: config[:min_bytes],
max_bytes: config[:max_bytes],
max_wait_ms: config[:max_wait_ms]
end
@@ -79,15 +81,19 @@
group: "fluq",
min_bytes: 0,
max_bytes: (1024 * 1024),
max_wait_ms: 100,
brokers: ["localhost:9092"],
- zookeepers: ["localhost:2181"]
+ zookeepers: ["localhost:2181"],
+ consumer_class: ::FluQ::Kafka::Consumer
+
end
def before_terminate
@consumer.close if @consumer
rescue ThreadError
+ ensure
+ @consumer = nil
end
end