lib/fluent/plugin/in_unix_client.rb in fluent-plugin-unix-client-0.1.0 vs lib/fluent/plugin/in_unix_client.rb in fluent-plugin-unix-client-1.0.0

- old
+ new

@@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. require "fluent/plugin/input" require 'socket' +require "json" module Fluent module Plugin class UnixClientInput < Fluent::Plugin::Input Fluent::Plugin.register_input("unix_client", self) @@ -27,15 +28,23 @@ config_param :tag, :string desc 'The path to Unix Domain Socket.' config_param :path, :string desc 'The payload is read up to this character.' config_param :delimiter, :string, default: "\n" + desc "When recieved JSON data splitted by the delimiter is not completed, like '[{...},'," \ + " trim '[', ']' and ',' characters to format." + config_param :format_json, :bool, default: false def configure(conf) super @parser = parser_create - @socket_handler = SocketHandler.new(@path, delimiter: @delimiter, log: log) + @socket_handler = SocketHandler.new( + @path, + delimiter: @delimiter, + format_json: @format_json, + log: log, + ) end def start super thread_create(:in_unix_client, &method(:keep_receiving)) @@ -58,25 +67,38 @@ raw_records = @socket_handler.try_receive return if raw_records.nil? || raw_records.empty? raw_records.each do |raw_record| @parser.parse(raw_record) do |time, record| - router.emit(@tag, time, record) + emit_one_parsed(time, record) end end end + + def emit_one_parsed(time, record) + case record + when Array + es = Fluent::MultiEventStream.new + record.each do |e| + es.add(time, e) + end + router.emit_stream(@tag, es) + else + router.emit(@tag, time, record) + end + end end class SocketHandler MAX_LENGTH_RECEIVE_ONCE = 10000 - def initialize(path, delimiter: "\n", log: nil) + def initialize(path, delimiter: "\n", format_json: false, log: nil) @path = path @log = log @socket = nil - @buf = Buffer.new(delimiter) + @buf = Buffer.new(delimiter, format_json: format_json) end def connected? !@socket.nil? end @@ -140,13 +162,14 @@ end end class Buffer - def initialize(delimiter) + def initialize(delimiter, format_json: false) @buf = "" @delimiter = delimiter + @format_json = format_json end def add(data) @buf << data end @@ -158,16 +181,52 @@ def extract_records records = [] pos_read = 0 while pos_next_delimiter = @buf.index(@delimiter, pos_read) - records << @buf[pos_read...pos_next_delimiter] + fixed = fix_format(@buf[pos_read...pos_next_delimiter]) + records << fixed unless fixed.empty? pos_read = pos_next_delimiter + @delimiter.size end @buf.slice!(0, pos_read) if pos_read > 0 records + end + + private + + def fix_format(record) + return record if record.empty? + return record unless @format_json + + fix_uncompleted_json(record) + end + + def fix_uncompleted_json(record) + return record if is_correct_json(record) + + # Assume uncompleted JSON such as "[{...},", "{...},", or "{...}]" + + if record[0] == "[" + record.slice!(0) + return record if record.empty? + end + + if record[-1] == "," || record[-1] == "]" + record.slice!(-1) + return record if record.empty? + end + + record + end + + def is_correct_json(record) + # Just to check the format + JSON.parse(record) + return true + rescue JSON::ParserError + return false end end end end