lib/fluent/plugin/out_scribe.rb in fluent-plugin-scribe-0.10.8 vs lib/fluent/plugin/out_scribe.rb in fluent-plugin-scribe-0.10.9

- old
+ new

@@ -23,13 +23,13 @@ config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 1463 config_param :field_ref, :string, :default => 'message' config_param :timeout, :integer, :default => 30 - config_param :remove_prefix, :string, :default => nil - config_param :add_newline, :bool, :default => false - config_param :default_category, :string, :default => 'unknown' + config_param :remove_prefix, :string, :default => nil + config_param :add_newline, :bool, :default => false + config_param :default_category, :string, :default => 'unknown' def initialize require 'thrift' $:.unshift File.join(File.dirname(__FILE__), 'thrift') require 'fb303_types' @@ -40,10 +40,13 @@ require 'scribe' super end def configure(conf) + # override default buffer_chunk_limit + conf['buffer_chunk_limit'] ||= '1m' + super end def start super @@ -67,41 +70,35 @@ [tag, record].to_msgpack end end def write(chunk) - records = [] - chunk.msgpack_each { |arr| - records << arr - } - socket = Thrift::Socket.new @host, @port, @timeout transport = Thrift::FramedTransport.new socket protocol = Thrift::BinaryProtocol.new transport, false, false client = Scribe::Client.new protocol transport.open begin entries = [] - if @add_newline - records.each { |r| - tag, record = r - next unless record.has_key?(@field_ref) - entry = LogEntry.new - entry.category = tag + + chunk.msgpack_each do |arr| + tag, record = arr + next unless record.has_key?(@field_ref) + + entry = LogEntry.new + entry.category = tag + + if @add_newline entry.message = (record[@field_ref] + "\n").force_encoding('ASCII-8BIT') - entries << entry - } - else - records.each { |r| - tag, record = r - next unless record.has_key?(@field_ref) - entry = LogEntry.new - entry.category = tag + else entry.message = record[@field_ref].force_encoding('ASCII-8BIT') - entries << entry - } + end + + entries << entry end + + $log.info "Writing #{entries.count} entries to scribe" client.Log(entries) ensure transport.close end end