lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.4.pre vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.4

- old
+ new

@@ -38,10 +38,12 @@ config_param :return_immediately, :bool, default: true desc 'Set number of threads to pull messages.' config_param :pull_threads, :integer, default: 1 desc 'Set input format.' config_param :format, :string, default: 'json' + desc 'Set error type when parsing messages fails.' + config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning] # for HTTP RPC desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.' config_param :enable_rpc, :bool, default: false desc 'Bind IP address for HTTP RPC.' config_param :rpc_bind, :string, default: '0.0.0.0' @@ -189,10 +191,12 @@ process messages @subscriber.acknowledge messages log.debug "#{messages.length} message(s) processed" + rescue Fluent::GcloudPubSub::RetryableError => ex + log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace ex.backtrace end @@ -202,10 +206,15 @@ line = m.message.data.chomp @parser.parse(line) do |time, record| if time && record es.add(time, record) else - raise FailedParseError.new "pattern not match: #{line.inspect}" + case @parse_error_action + when :exception + raise FailedParseError.new "pattern not match: #{line.inspect}" + else + log.warn 'pattern not match', record: line.inspect + end end end end # There are some output plugins not to supposed to be called with multi-threading.