lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.4.pre vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-0.3.4
- old
+ new
@@ -38,10 +38,12 @@
config_param :return_immediately, :bool, default: true
desc 'Set number of threads to pull messages.'
config_param :pull_threads, :integer, default: 1
desc 'Set input format.'
config_param :format, :string, default: 'json'
+ desc 'Set error type when parsing messages fails.'
+ config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning]
# for HTTP RPC
desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.'
config_param :enable_rpc, :bool, default: false
desc 'Bind IP address for HTTP RPC.'
config_param :rpc_bind, :string, default: '0.0.0.0'
@@ -189,10 +191,12 @@
process messages
@subscriber.acknowledge messages
log.debug "#{messages.length} message(s) processed"
+ rescue Fluent::GcloudPubSub::RetryableError => ex
+ log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s
rescue => ex
log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
log.error_backtrace ex.backtrace
end
@@ -202,10 +206,15 @@
line = m.message.data.chomp
@parser.parse(line) do |time, record|
if time && record
es.add(time, record)
else
- raise FailedParseError.new "pattern not match: #{line.inspect}"
+ case @parse_error_action
+ when :exception
+ raise FailedParseError.new "pattern not match: #{line.inspect}"
+ else
+ log.warn 'pattern not match', record: line.inspect
+ end
end
end
end
# There are some output plugins not to supposed to be called with multi-threading.