lib/fluent/plugin/parser_protobuf.rb in fluent-plugin-parser-protobuf-0.1.1 vs lib/fluent/plugin/parser_protobuf.rb in fluent-plugin-parser-protobuf-0.1.2

- old
+ new

@@ -9,44 +9,66 @@ module Fluent module Plugin class ProtobufParser < Parser Plugin.register_parser('protobuf', self) - class Error < StandardError; end - config_param :include_paths, :array, default: [] config_param :class_file, :string, default: nil config_param :protobuf_version, :enum, list: [:protobuf2, :protobuf3], default: :protobuf3 config_param :class_name, :string + config_param :suppress_decoding_error, :bool, default: false def configure(conf) super + + if !@include_paths.empty? && !@class_file.nil? + raise Fluent::ConfigError, "Cannot use `include_paths` and `class_file` at the same time." + end + + if @include_paths.empty? && @class_file.nil? + raise Fluent::ConfigError, "Need to specify `include_paths` or `class_file`." + end + loading_required = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).nil? load_protobuf_class(@class_file) if loading_required && @class_file include_paths.each {|path| load_protobuf_definition(path)} if !include_paths.empty? && loading_required if @protobuf_version == :protobuf2 - @protobuf_descriptor = create_prptobuf2_instance(@class_name) + @protobuf_descriptor = create_protobuf2_instance(@class_name) elsif @protobuf_version == :protobuf3 @protobuf_descriptor = Google::Protobuf::DescriptorPool.generated_pool.lookup(@class_name).msgclass end end + def parser_type + :binary + end + def parse(binary) - if @protobuf_version == :protobuf3 - decoded = @protobuf_descriptor.decode(binary.to_s) - time = @estimate_current_event ? Fluent::EventTime.now : nil - yield time, decoded.to_h - elsif @protobuf_version == :protobuf2 - decoded = @protobuf_descriptor.parse(binary.to_s) - time = @estimate_current_event ? Fluent::EventTime.now : nil - yield time, decoded.to_hash + begin + if @protobuf_version == :protobuf3 + decoded = @protobuf_descriptor.decode(binary.to_s) + time = @estimate_current_event ? Fluent::EventTime.now : nil + yield time, decoded.to_h + elsif @protobuf_version == :protobuf2 + decoded = @protobuf_descriptor.parse(binary.to_s) + time = @estimate_current_event ? Fluent::EventTime.now : nil + yield time, decoded.to_hash + end + rescue => e + log.warn("Couldn't decode protobuf: #{e.inspect}, message: #{binary}") + if @suppress_decoding_error + yield nil, nil + else + raise e + end end end + alias parse_partial_data parse - def create_prptobuf2_instance(class_name) + def create_protobuf2_instance(class_name) unless Object.const_defined?(class_name) raise Fluent::ConfigError, "Cannot find class #{class_name}." else Object.const_get(class_name) end