lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.2.0 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.3.0
- old
+ new
@@ -35,10 +35,12 @@
config_param :max_messages, :integer, default: 100
desc 'Setting `true`, keepalive connection to wait for new messages.'
config_param :return_immediately, :bool, default: true
desc 'Set number of threads to pull messages.'
config_param :pull_threads, :integer, default: 1
+ desc 'Specify the key of the attribute to be acquired as a record'
+ config_param :attribute_keys, :array, default: []
desc 'Set error type when parsing messages fails.'
config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning]
# for HTTP RPC
desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.'
config_param :enable_rpc, :bool, default: false
@@ -218,11 +220,16 @@
hsh[key] = Fluent::MultiEventStream.new
end
messages.each do |m|
line = m.message.data.chomp
+ attributes = m.attributes
@parser.parse(line) do |time, record|
if time && record
+ @attribute_keys.each do |key|
+ record[key] = attributes[key]
+ end
+
event_streams[@extract_tag.call(record)].add(time, record)
else
case @parse_error_action
when :exception
raise FailedParseError.new "pattern not match: #{line.inspect}"