lib/fluent/plugin/in_unix_client.rb in fluent-plugin-unix-client-0.1.0 vs lib/fluent/plugin/in_unix_client.rb in fluent-plugin-unix-client-1.0.0
- old
+ new
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
require "fluent/plugin/input"
require 'socket'
+require "json"
module Fluent
module Plugin
class UnixClientInput < Fluent::Plugin::Input
Fluent::Plugin.register_input("unix_client", self)
@@ -27,15 +28,23 @@
config_param :tag, :string
desc 'The path to Unix Domain Socket.'
config_param :path, :string
desc 'The payload is read up to this character.'
config_param :delimiter, :string, default: "\n"
+ desc "When recieved JSON data splitted by the delimiter is not completed, like '[{...},'," \
+ " trim '[', ']' and ',' characters to format."
+ config_param :format_json, :bool, default: false
def configure(conf)
super
@parser = parser_create
- @socket_handler = SocketHandler.new(@path, delimiter: @delimiter, log: log)
+ @socket_handler = SocketHandler.new(
+ @path,
+ delimiter: @delimiter,
+ format_json: @format_json,
+ log: log,
+ )
end
def start
super
thread_create(:in_unix_client, &method(:keep_receiving))
@@ -58,25 +67,38 @@
raw_records = @socket_handler.try_receive
return if raw_records.nil? || raw_records.empty?
raw_records.each do |raw_record|
@parser.parse(raw_record) do |time, record|
- router.emit(@tag, time, record)
+ emit_one_parsed(time, record)
end
end
end
+
+ def emit_one_parsed(time, record)
+ case record
+ when Array
+ es = Fluent::MultiEventStream.new
+ record.each do |e|
+ es.add(time, e)
+ end
+ router.emit_stream(@tag, es)
+ else
+ router.emit(@tag, time, record)
+ end
+ end
end
class SocketHandler
MAX_LENGTH_RECEIVE_ONCE = 10000
- def initialize(path, delimiter: "\n", log: nil)
+ def initialize(path, delimiter: "\n", format_json: false, log: nil)
@path = path
@log = log
@socket = nil
- @buf = Buffer.new(delimiter)
+ @buf = Buffer.new(delimiter, format_json: format_json)
end
def connected?
!@socket.nil?
end
@@ -140,13 +162,14 @@
end
end
class Buffer
- def initialize(delimiter)
+ def initialize(delimiter, format_json: false)
@buf = ""
@delimiter = delimiter
+ @format_json = format_json
end
def add(data)
@buf << data
end
@@ -158,16 +181,52 @@
def extract_records
records = []
pos_read = 0
while pos_next_delimiter = @buf.index(@delimiter, pos_read)
- records << @buf[pos_read...pos_next_delimiter]
+ fixed = fix_format(@buf[pos_read...pos_next_delimiter])
+ records << fixed unless fixed.empty?
pos_read = pos_next_delimiter + @delimiter.size
end
@buf.slice!(0, pos_read) if pos_read > 0
records
+ end
+
+ private
+
+ def fix_format(record)
+ return record if record.empty?
+ return record unless @format_json
+
+ fix_uncompleted_json(record)
+ end
+
+ def fix_uncompleted_json(record)
+ return record if is_correct_json(record)
+
+ # Assume uncompleted JSON such as "[{...},", "{...},", or "{...}]"
+
+ if record[0] == "["
+ record.slice!(0)
+ return record if record.empty?
+ end
+
+ if record[-1] == "," || record[-1] == "]"
+ record.slice!(-1)
+ return record if record.empty?
+ end
+
+ record
+ end
+
+ def is_correct_json(record)
+ # Just to check the format
+ JSON.parse(record)
+ return true
+ rescue JSON::ParserError
+ return false
end
end
end
end