lib/fluent/plugin/in_protobuf_http.rb in fluent-plugin-protobuf-http-0.2.0 vs lib/fluent/plugin/in_protobuf_http.rb in fluent-plugin-protobuf-http-0.3.0
- old
+ new
@@ -1,5 +1,7 @@
+# frozen-string-literal: true
+
#
# Copyright 2020 Azeem Sajid
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,13 +18,15 @@
require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/plugin_helper/http_server'
require 'webrick/httputils'
require 'json'
+require 'English'
module Fluent
module Plugin
+ # Implementation of HTTP input plugin for Protobuf
class ProtobufHttpInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('protobuf_http', self)
helpers :http_server, :event_emitter
@@ -44,14 +48,10 @@
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: %i[tcp tls], default: :tcp
end
- config_section :transform, required: false, multi: false, init: true, param_name: :transform_config do
- config_argument :msgtype, :string
- end
-
def initialize
super
@protos = [] # list of *.proto files
@compiled_protos = [] # list of compiled protos i.e. *_pb.rb files
@@ -59,20 +59,20 @@
end
def compile_protos
log.debug("Checking proto_dir [#{@proto_dir}]...")
- path = File.expand_path(@proto_dir).freeze
+ path = File.expand_path(@proto_dir)
raise Fluent::ConfigError, "protos_dir does not exist! [#{path}]" unless Dir.exist?(path)
- @protos = Dir["#{path}/*.proto"].freeze
+ @protos = Dir["#{path}/*.proto"]
raise Fluent::ConfigError, "Empty proto_dir! [#{path}]" unless @protos.any?
log.info("Compiling .proto files [#{@protos.length}]...")
`protoc --ruby_out=#{path} --proto_path=#{path} #{path}/*.proto`
- raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $?.success?
+ raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $CHILD_STATUS.success?
log.info("Compiled successfully:\n- #{@protos.join("\n- ")}")
@protos.each do |proto|
@compiled_protos.push(get_compiled_proto(proto))
@@ -80,12 +80,12 @@
log.info("Compiled .proto files:\n- #{@compiled_protos.join("\n- ")}")
end
def get_compiled_proto(proto)
- proto_suffix = '.proto'.freeze
- compiled_proto_suffix = '_pb.rb'.freeze
+ proto_suffix = '.proto'
+ compiled_proto_suffix = '_pb.rb'
compiled_proto = proto.chomp(proto_suffix) + compiled_proto_suffix
raise Fluent::ConfigError, "Compiled proto not found! [#{compiled_proto}]" unless File.file?(compiled_proto)
compiled_proto
@@ -115,11 +115,11 @@
def get_msg_types(compiled_proto)
log.debug("Extracting message types [#{compiled_proto}]...")
msg_types = []
File.foreach(compiled_proto) do |line|
if line.lstrip.start_with?('add_message')
- msg_type = line[/"([^"]*)"/, 1].freeze # regex: <add_message> 'msg_type' <do>
+ msg_type = line[/"([^"]*)"/, 1] # regex: <add_message> 'msg_type' <do>
msg_types.push(msg_type) unless msg_type.nil?
end
end
if msg_types.any?
@@ -150,46 +150,44 @@
if @transport_config && @transport_config.protocol == :tls
proto = :tls
tls_opts = @transport_config.to_h
end
- log.warn("#{@transform_config.to_h}")
-
log.info("Starting protobuf #{proto == :tcp ? 'HTTP' : 'HTTPS'} server [#{@bind}:#{@port}]...")
log.debug("TLS configuration:\n#{tls_opts}") if tls_opts
http_server_create_http_server(:protobuf_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opts) do |server|
server.post("/#{tag}") do |req|
- peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}".freeze # ip:port
- serialized_msg = req.body.freeze
+ peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}" # ip:port
+ serialized_msg = req.body
log.info("[R] {#{@in_mode}} [#{peeraddr}, size: #{serialized_msg.length} bytes]")
log.debug("Dumping serialized message [#{serialized_msg.length} bytes]:\n#{serialized_msg}")
content_type = req.header['content-type'][0]
unless valid_content_type?(content_type)
- status = "Invalid 'Content-Type' header! [#{content_type}]".freeze
+ status = "Invalid 'Content-Type' header! [#{content_type}]"
log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
end
log.debug("[>] Content-Type: #{content_type}")
msgtype, batch = get_query_params(req.query_string)
unless @msgclass_lookup.key?(msgtype)
- status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]".freeze
+ status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]"
log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
end
log.debug("[>] Query parameters: [msgtype: #{content_type}, batch: #{batch}]")
deserialized_msg = deserialize_msg(msgtype, serialized_msg)
if deserialized_msg.nil?
- status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]".freeze
+ status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]"
log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
end
is_batch = !batch.nil? && batch == 'true'
@@ -212,11 +210,11 @@
# Log batch messages
log.info("[B] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]")
if deserialized_msg.type.nil? || deserialized_msg.batch.nil? || deserialized_msg.batch.empty?
- status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]".freeze
+ status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]"
log.warn("[X] Message rejected! [#{peeraddr}] #{status}")
next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
end
batch_type = deserialized_msg.type
@@ -232,20 +230,20 @@
stream.add(time, record)
end
router.emit_stream(@tag, stream)
- status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]".freeze
+ status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]"
log.info("[B] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}] #{status}")
[200, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json]
end
end
end
def valid_content_type?(content_type)
- hdr_binary = 'application/octet-stream'.freeze
- hdr_json = 'application/json'.freeze
+ hdr_binary = 'application/octet-stream'
+ hdr_json = 'application/json'
case @in_mode
when :binary
content_type == hdr_binary
when :json
@@ -282,11 +280,11 @@
msgclass.decode_json(serialized_msg)
end
rescue Google::Protobuf::ParseError => e
log.error("Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes] #{e}")
nil
- rescue => e
+ rescue StandardError => e
log.error("Deserializaton failed! Error: #{e}")
nil
end
end
@@ -298,10 +296,10 @@
when :binary
msgclass.encode(deserialized_msg)
when :json
msgclass.encode_json(deserialized_msg)
end
- rescue => e
+ rescue StandardError => e
log.error("Serialization failed! [msgtype: #{msgtype}, msg: #{deserialized_msg}] Error: #{e}")
nil
end
end