lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.2.2 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.3.0.rc1
- old
+ new
@@ -1,19 +1,23 @@
require 'fluent/input'
+require 'fluent/plugin/kafka_plugin_util'
+
module Fluent
class KafkaInput < Input
Plugin.register_input('kafka', self)
config_param :format, :string, :default => 'json',
:desc => "Supported format: (json|text|ltsv|msgpack)"
config_param :message_key, :string, :default => 'message',
:desc => "For 'text' format only."
- config_param :host, :string, :default => 'localhost',
+ config_param :host, :string, :default => nil,
:desc => "Broker host"
- config_param :port, :integer, :default => 9092,
+ config_param :port, :integer, :default => nil,
:desc => "Broker port"
+ config_param :brokers, :string, :default => 'localhost:9092',
+ :desc => "List of broker-host:port, separate with comma, must set."
config_param :interval, :integer, :default => 1, # seconds
:desc => "Interval (Unit: seconds)"
config_param :topics, :string, :default => nil,
:desc => "Listening topics(separate with comma',')"
config_param :client_id, :string, :default => 'kafka'
@@ -28,28 +32,29 @@
config_param :add_offset_in_record, :bool, :default => false
config_param :offset_zookeeper, :string, :default => nil
config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka'
- # poseidon PartitionConsumer options
+ # Kafka#fetch_messages options
config_param :max_bytes, :integer, :default => nil,
:desc => "Maximum number of bytes to fetch."
- config_param :max_wait_ms, :integer, :default => nil,
+ config_param :max_wait_time, :integer, :default => nil,
:desc => "How long to block until the server sends us data."
config_param :min_bytes, :integer, :default => nil,
:desc => "Smallest amount of data the server should send us."
config_param :socket_timeout_ms, :integer, :default => nil,
:desc => "How long to wait for reply from server. Should be higher than max_wait_ms."
+ include KafkaPluginUtil::SSLSettings
+
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
end
def initialize
super
- require 'poseidon'
- require 'zookeeper'
+ require 'kafka'
end
def configure(conf)
super
@@ -71,42 +76,94 @@
if @topic_list.empty?
raise ConfigError, "kafka: 'topics' or 'topic element' is a require parameter"
end
+ # For backward compatibility
+ @brokers = case
+ when @host && @port
+ ["#{@host}:#{@port}"]
+ when @host
+ ["#{@host}:9092"]
+ when @port
+ ["localhost:#{@port}"]
+ else
+ @brokers
+ end
+
+ if conf['max_wait_ms']
+ log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead"
+ @max_wait_time = conf['max_wait_ms'].to_i / 1000
+ end
+
+ @max_wait_time = @interval if @max_wait_time.nil?
+
+ require 'zookeeper' if @offset_zookeeper
+
+ @parser_proc = setup_parser
+ end
+
+ def setup_parser
case @format
when 'json'
require 'yajl'
+ Proc.new { |msg, te|
+ r = Yajl::Parser.parse(msg.value)
+ add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
+ r
+ }
when 'ltsv'
require 'ltsv'
+ Proc.new { |msg, te|
+ r = LTSV.parse(msg.value).first
+ add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
+ r
+ }
when 'msgpack'
require 'msgpack'
+ Proc.new { |msg, te|
+ r = MessagePack.unpack(msg.value)
+ add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
+ r
+ }
+ when 'text'
+ Proc.new { |msg, te|
+ r = {@message_key => msg.value}
+ add_offset_in_hash(r, te, msg.offset) if @add_offset_in_record
+ r
+ }
end
end
+ def add_offset_in_hash(hash, te, offset)
+ hash['kafka_topic'.freeze] = te.topic
+ hash['kafka_partition'.freeze] = te.partition
+ hash['kafka_offset'.freeze] = offset
+ end
+
def start
super
+
@loop = Coolio::Loop.new
opt = {}
opt[:max_bytes] = @max_bytes if @max_bytes
- opt[:max_wait_ms] = @max_wait_ms if @max_wait_ms
+ opt[:max_wait_time] = @max_wait_time if @max_wait_time
opt[:min_bytes] = @min_bytes if @min_bytes
- opt[:socket_timeout_ms] = @socket_timeout_ms if @socket_timeout_ms
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id,
+ ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert),
+ ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key))
@zookeeper = Zookeeper.new(@offset_zookeeper) if @offset_zookeeper
@topic_watchers = @topic_list.map {|topic_entry|
offset_manager = OffsetManager.new(topic_entry, @zookeeper, @offset_zk_root_node) if @offset_zookeeper
TopicWatcher.new(
topic_entry,
- @host,
- @port,
- @client_id,
+ @kafka,
interval,
- @format,
- @message_key,
- @add_offset_in_record,
+ @parser_proc,
@add_prefix,
@add_suffix,
offset_manager,
router,
opt)
@@ -118,130 +175,83 @@
end
def shutdown
@loop.stop
@zookeeper.close! if @zookeeper
+ @thread.join
+ @kafka.close
super
end
def run
@loop.run
- rescue
- $log.error "unexpected error", :error=>$!.to_s
+ rescue => e
+ $log.error "unexpected error", :error => e.to_s
$log.error_backtrace
end
class TopicWatcher < Coolio::TimerWatcher
- def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, router, options={})
+ def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, options={})
@topic_entry = topic_entry
- @host = host
- @port = port
- @client_id = client_id
+ @kafka = kafka
@callback = method(:consume)
- @format = format
- @message_key = message_key
- @add_offset_in_record = add_offset_in_record
+ @parser = parser
@add_prefix = add_prefix
@add_suffix = add_suffix
@options = options
@offset_manager = offset_manager
@router = router
@next_offset = @topic_entry.offset
if @topic_entry.offset == -1 && offset_manager
@next_offset = offset_manager.next_offset
end
- @consumer = create_consumer(@next_offset)
+ @fetch_args = {
+ topic: @topic_entry.topic,
+ partition: @topic_entry.partition,
+ }.merge(@options)
super(interval, true)
end
def on_timer
@callback.call
- rescue
- # TODO log?
- $log.error $!.to_s
- $log.error_backtrace
+ rescue => e
+ # TODO log?
+ $log.error e.to_s
+ $log.error_backtrace
end
def consume
+ offset = @next_offset
+ @fetch_args[:offset] = offset
+ messages = @kafka.fetch_messages(@fetch_args)
+
+ return if messages.size.zero?
+
es = MultiEventStream.new
tag = @topic_entry.topic
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix
- if @offset_manager && @consumer.next_offset != @next_offset
- @consumer = create_consumer(@next_offset)
- end
-
- @consumer.fetch.each { |msg|
+ messages.each { |msg|
begin
- msg_record = parse_line(msg.value)
- msg_record = decorate_offset(msg_record, msg.offset) if @add_offset_in_record
- es.add(Engine.now, msg_record)
- rescue
- $log.warn msg_record.to_s, :error=>$!.to_s
+ es.add(Engine.now, @parser.call(msg, @topic_entry))
+ rescue => e
+ $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
$log.debug_backtrace
end
}
+ offset = messages.last.offset + 1
unless es.empty?
@router.emit_stream(tag, es)
if @offset_manager
- next_offset = @consumer.next_offset
- @offset_manager.save_offset(next_offset)
- @next_offset = next_offset
+ @offset_manager.save_offset(offset)
end
+ @next_offset = offset
end
- end
-
- def create_consumer(offset)
- @consumer.close if @consumer
- Poseidon::PartitionConsumer.new(
- @client_id, # client_id
- @host, # host
- @port, # port
- @topic_entry.topic, # topic
- @topic_entry.partition, # partition
- offset, # offset
- @options # options
- )
- end
-
- def parse_line(record)
- case @format
- when 'json'
- Yajl::Parser.parse(record)
- when 'ltsv'
- LTSV.parse(record)
- when 'msgpack'
- MessagePack.unpack(record)
- when 'text'
- {@message_key => record}
- end
- end
-
- def decorate_offset(record, offset)
- case @format
- when 'json'
- add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset)
- when 'ltsv'
- record.each { |line|
- add_offset_in_hash(line, @topic_entry.topic, @topic_entry.partition, offset)
- }
- when 'msgpack'
- add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset)
- when 'text'
- add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset)
- end
- record
- end
-
- def add_offset_in_hash(hash, topic, partition, offset)
- hash['kafka_topic'] = topic
- hash['kafka_partition'] = partition
- hash['kafka_offset'] = offset
end
end
class TopicEntry
def initialize(topic, partition, offset)