lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.1.0 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.1.1
- old
+ new
@@ -23,10 +23,14 @@
config_param :max_bytes, :integer, :default => nil
config_param :max_wait_ms, :integer, :default => nil
config_param :min_bytes, :integer, :default => nil
config_param :socket_timeout_ms, :integer, :default => nil
+ unless method_defined?(:router)
+ define_method("router") { Fluent::Engine }
+ end
+
def initialize
super
require 'poseidon'
require 'zookeeper'
end
@@ -86,10 +90,11 @@
@message_key,
@add_offset_in_record,
@add_prefix,
@add_suffix,
offset_manager,
+ router,
opt)
}
@topic_watchers.each {|tw|
tw.attach(@loop)
}
@@ -107,11 +112,11 @@
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
class TopicWatcher < Coolio::TimerWatcher
- def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, options={})
+ def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, router, options={})
@topic_entry = topic_entry
@host = host
@port = port
@client_id = client_id
@callback = method(:consume)
@@ -120,10 +125,11 @@
@add_offset_in_record = add_offset_in_record
@add_prefix = add_prefix
@add_suffix = add_suffix
@options = options
@offset_manager = offset_manager
+ @router = router
@next_offset = @topic_entry.offset
if @topic_entry.offset == -1 && offset_manager
@next_offset = offset_manager.next_offset
end
@@ -152,19 +158,19 @@
@consumer.fetch.each { |msg|
begin
msg_record = parse_line(msg.value)
msg_record = decorate_offset(msg_record, msg.offset) if @add_offset_in_record
- es.add(Time.now.to_i, msg_record)
+ es.add(Engine.now, msg_record)
rescue
$log.warn msg_record.to_s, :error=>$!.to_s
$log.debug_backtrace
end
}
unless es.empty?
- Engine.emit_stream(tag, es)
+ @router.emit_stream(tag, es)
if @offset_manager
next_offset = @consumer.next_offset
@offset_manager.save_offset(next_offset)
@next_offset = next_offset
@@ -184,21 +190,19 @@
@options # options
)
end
def parse_line(record)
- parsed_record = {}
case @format
when 'json'
- parsed_record = Yajl::Parser.parse(record)
+ Yajl::Parser.parse(record)
when 'ltsv'
- parsed_record = LTSV.parse(record)
+ LTSV.parse(record)
when 'msgpack'
- parsed_record = MessagePack.unpack(record)
+ MessagePack.unpack(record)
when 'text'
- parsed_record[@message_key] = record
+ {@message_key => record}
end
- parsed_record
end
def decorate_offset(record, offset)
case @format
when 'json'