lib/fluent/plugin/in_protobuf_http.rb in fluent-plugin-protobuf-http-0.2.0 vs lib/fluent/plugin/in_protobuf_http.rb in fluent-plugin-protobuf-http-0.3.0

- old
+ new

@@ -1,5 +1,7 @@ +# frozen-string-literal: true + # # Copyright 2020 Azeem Sajid # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,13 +18,15 @@ require 'fluent/plugin/input' require 'fluent/config/error' require 'fluent/plugin_helper/http_server' require 'webrick/httputils' require 'json' +require 'English' module Fluent module Plugin + # Implementation of HTTP input plugin for Protobuf class ProtobufHttpInput < Fluent::Plugin::Input Fluent::Plugin.register_input('protobuf_http', self) helpers :http_server, :event_emitter @@ -44,14 +48,10 @@ config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do config_argument :protocol, :enum, list: %i[tcp tls], default: :tcp end - config_section :transform, required: false, multi: false, init: true, param_name: :transform_config do - config_argument :msgtype, :string - end - def initialize super @protos = [] # list of *.proto files @compiled_protos = [] # list of compiled protos i.e. *_pb.rb files @@ -59,20 +59,20 @@ end def compile_protos log.debug("Checking proto_dir [#{@proto_dir}]...") - path = File.expand_path(@proto_dir).freeze + path = File.expand_path(@proto_dir) raise Fluent::ConfigError, "protos_dir does not exist! [#{path}]" unless Dir.exist?(path) - @protos = Dir["#{path}/*.proto"].freeze + @protos = Dir["#{path}/*.proto"] raise Fluent::ConfigError, "Empty proto_dir! [#{path}]" unless @protos.any? log.info("Compiling .proto files [#{@protos.length}]...") `protoc --ruby_out=#{path} --proto_path=#{path} #{path}/*.proto` - raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $?.success? + raise Fluent::ConfigError, 'Could not compile! See error(s) above.' unless $CHILD_STATUS.success? log.info("Compiled successfully:\n- #{@protos.join("\n- ")}") @protos.each do |proto| @compiled_protos.push(get_compiled_proto(proto)) @@ -80,12 +80,12 @@ log.info("Compiled .proto files:\n- #{@compiled_protos.join("\n- ")}") end def get_compiled_proto(proto) - proto_suffix = '.proto'.freeze - compiled_proto_suffix = '_pb.rb'.freeze + proto_suffix = '.proto' + compiled_proto_suffix = '_pb.rb' compiled_proto = proto.chomp(proto_suffix) + compiled_proto_suffix raise Fluent::ConfigError, "Compiled proto not found! [#{compiled_proto}]" unless File.file?(compiled_proto) compiled_proto @@ -115,11 +115,11 @@ def get_msg_types(compiled_proto) log.debug("Extracting message types [#{compiled_proto}]...") msg_types = [] File.foreach(compiled_proto) do |line| if line.lstrip.start_with?('add_message') - msg_type = line[/"([^"]*)"/, 1].freeze # regex: <add_message> 'msg_type' <do> + msg_type = line[/"([^"]*)"/, 1] # regex: <add_message> 'msg_type' <do> msg_types.push(msg_type) unless msg_type.nil? end end if msg_types.any? @@ -150,46 +150,44 @@ if @transport_config && @transport_config.protocol == :tls proto = :tls tls_opts = @transport_config.to_h end - log.warn("#{@transform_config.to_h}") - log.info("Starting protobuf #{proto == :tcp ? 'HTTP' : 'HTTPS'} server [#{@bind}:#{@port}]...") log.debug("TLS configuration:\n#{tls_opts}") if tls_opts http_server_create_http_server(:protobuf_server, addr: @bind, port: @port, logger: log, proto: proto, tls_opts: tls_opts) do |server| server.post("/#{tag}") do |req| - peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}".freeze # ip:port - serialized_msg = req.body.freeze + peeraddr = "#{req.peeraddr[2]}:#{req.peeraddr[1]}" # ip:port + serialized_msg = req.body log.info("[R] {#{@in_mode}} [#{peeraddr}, size: #{serialized_msg.length} bytes]") log.debug("Dumping serialized message [#{serialized_msg.length} bytes]:\n#{serialized_msg}") content_type = req.header['content-type'][0] unless valid_content_type?(content_type) - status = "Invalid 'Content-Type' header! [#{content_type}]".freeze + status = "Invalid 'Content-Type' header! [#{content_type}]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end log.debug("[>] Content-Type: #{content_type}") msgtype, batch = get_query_params(req.query_string) unless @msgclass_lookup.key?(msgtype) - status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]".freeze + status = "Invalid 'msgtype' in 'query_string'! [#{msgtype}]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end log.debug("[>] Query parameters: [msgtype: #{content_type}, batch: #{batch}]") deserialized_msg = deserialize_msg(msgtype, serialized_msg) if deserialized_msg.nil? - status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]".freeze + status = "Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end is_batch = !batch.nil? && batch == 'true' @@ -212,11 +210,11 @@ # Log batch messages log.info("[B] {#{@in_mode}} [#{peeraddr}, msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]") if deserialized_msg.type.nil? || deserialized_msg.batch.nil? || deserialized_msg.batch.empty? - status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]".freeze + status = "Invalid 'batch' message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes]" log.warn("[X] Message rejected! [#{peeraddr}] #{status}") next [400, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end batch_type = deserialized_msg.type @@ -232,20 +230,20 @@ stream.add(time, record) end router.emit_stream(@tag, stream) - status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]".freeze + status = "Batch received! [batch_type: #{batch_type}, batch_size: #{batch_size} messages]" log.info("[B] {#{@out_mode}} [#{peeraddr}, msgtype: #{msgtype}] #{status}") [200, { 'Content-Type' => 'application/json', 'Connection' => 'close' }, { 'status' => status }.to_json] end end end def valid_content_type?(content_type) - hdr_binary = 'application/octet-stream'.freeze - hdr_json = 'application/json'.freeze + hdr_binary = 'application/octet-stream' + hdr_json = 'application/json' case @in_mode when :binary content_type == hdr_binary when :json @@ -282,11 +280,11 @@ msgclass.decode_json(serialized_msg) end rescue Google::Protobuf::ParseError => e log.error("Incompatible message! [msgtype: #{msgtype}, size: #{serialized_msg.length} bytes] #{e}") nil - rescue => e + rescue StandardError => e log.error("Deserializaton failed! Error: #{e}") nil end end @@ -298,10 +296,10 @@ when :binary msgclass.encode(deserialized_msg) when :json msgclass.encode_json(deserialized_msg) end - rescue => e + rescue StandardError => e log.error("Serialization failed! [msgtype: #{msgtype}, msg: #{deserialized_msg}] Error: #{e}") nil end end