lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.1.0 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.1.1

- old
+ new

@@ -23,10 +23,14 @@ config_param :max_bytes, :integer, :default => nil config_param :max_wait_ms, :integer, :default => nil config_param :min_bytes, :integer, :default => nil config_param :socket_timeout_ms, :integer, :default => nil + unless method_defined?(:router) + define_method("router") { Fluent::Engine } + end + def initialize super require 'poseidon' require 'zookeeper' end @@ -86,10 +90,11 @@ @message_key, @add_offset_in_record, @add_prefix, @add_suffix, offset_manager, + router, opt) } @topic_watchers.each {|tw| tw.attach(@loop) } @@ -107,11 +112,11 @@ $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end class TopicWatcher < Coolio::TimerWatcher - def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, options={}) + def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, router, options={}) @topic_entry = topic_entry @host = host @port = port @client_id = client_id @callback = method(:consume) @@ -120,10 +125,11 @@ @add_offset_in_record = add_offset_in_record @add_prefix = add_prefix @add_suffix = add_suffix @options = options @offset_manager = offset_manager + @router = router @next_offset = @topic_entry.offset if @topic_entry.offset == -1 && offset_manager @next_offset = offset_manager.next_offset end @@ -152,19 +158,19 @@ @consumer.fetch.each { |msg| begin msg_record = parse_line(msg.value) msg_record = decorate_offset(msg_record, msg.offset) if @add_offset_in_record - es.add(Time.now.to_i, msg_record) + es.add(Engine.now, msg_record) rescue $log.warn msg_record.to_s, :error=>$!.to_s $log.debug_backtrace end } unless es.empty? - Engine.emit_stream(tag, es) + @router.emit_stream(tag, es) if @offset_manager next_offset = @consumer.next_offset @offset_manager.save_offset(next_offset) @next_offset = next_offset @@ -184,21 +190,19 @@ @options # options ) end def parse_line(record) - parsed_record = {} case @format when 'json' - parsed_record = Yajl::Parser.parse(record) + Yajl::Parser.parse(record) when 'ltsv' - parsed_record = LTSV.parse(record) + LTSV.parse(record) when 'msgpack' - parsed_record = MessagePack.unpack(record) + MessagePack.unpack(record) when 'text' - parsed_record[@message_key] = record + {@message_key => record} end - parsed_record end def decorate_offset(record, offset) case @format when 'json'