lib/fluent/plugin/out_scribe.rb in fluent-plugin-scribe-0.10.8 vs lib/fluent/plugin/out_scribe.rb in fluent-plugin-scribe-0.10.9
- old
+ new
@@ -23,13 +23,13 @@
config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 1463
config_param :field_ref, :string, :default => 'message'
config_param :timeout, :integer, :default => 30
- config_param :remove_prefix, :string, :default => nil
- config_param :add_newline, :bool, :default => false
- config_param :default_category, :string, :default => 'unknown'
+ config_param :remove_prefix, :string, :default => nil
+ config_param :add_newline, :bool, :default => false
+ config_param :default_category, :string, :default => 'unknown'
def initialize
require 'thrift'
$:.unshift File.join(File.dirname(__FILE__), 'thrift')
require 'fb303_types'
@@ -40,10 +40,13 @@
require 'scribe'
super
end
def configure(conf)
+ # override default buffer_chunk_limit
+ conf['buffer_chunk_limit'] ||= '1m'
+
super
end
def start
super
@@ -67,41 +70,35 @@
[tag, record].to_msgpack
end
end
def write(chunk)
- records = []
- chunk.msgpack_each { |arr|
- records << arr
- }
-
socket = Thrift::Socket.new @host, @port, @timeout
transport = Thrift::FramedTransport.new socket
protocol = Thrift::BinaryProtocol.new transport, false, false
client = Scribe::Client.new protocol
transport.open
begin
entries = []
- if @add_newline
- records.each { |r|
- tag, record = r
- next unless record.has_key?(@field_ref)
- entry = LogEntry.new
- entry.category = tag
+
+ chunk.msgpack_each do |arr|
+ tag, record = arr
+ next unless record.has_key?(@field_ref)
+
+ entry = LogEntry.new
+ entry.category = tag
+
+ if @add_newline
entry.message = (record[@field_ref] + "\n").force_encoding('ASCII-8BIT')
- entries << entry
- }
- else
- records.each { |r|
- tag, record = r
- next unless record.has_key?(@field_ref)
- entry = LogEntry.new
- entry.category = tag
+ else
entry.message = record[@field_ref].force_encoding('ASCII-8BIT')
- entries << entry
- }
+ end
+
+ entries << entry
end
+
+ $log.info "Writing #{entries.count} entries to scribe"
client.Log(entries)
ensure
transport.close
end
end