lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.2.0 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.3.0

- old
+ new

@@ -35,10 +35,12 @@ config_param :max_messages, :integer, default: 100 desc 'Setting `true`, keepalive connection to wait for new messages.' config_param :return_immediately, :bool, default: true desc 'Set number of threads to pull messages.' config_param :pull_threads, :integer, default: 1 + desc 'Specify the key of the attribute to be acquired as a record' + config_param :attribute_keys, :array, default: [] desc 'Set error type when parsing messages fails.' config_param :parse_error_action, :enum, default: :exception, list: [:exception, :warning] # for HTTP RPC desc 'If `true` is specified, HTTP RPC to stop or start pulling message is enabled.' config_param :enable_rpc, :bool, default: false @@ -218,11 +220,16 @@ hsh[key] = Fluent::MultiEventStream.new end messages.each do |m| line = m.message.data.chomp + attributes = m.attributes @parser.parse(line) do |time, record| if time && record + @attribute_keys.each do |key| + record[key] = attributes[key] + end + event_streams[@extract_tag.call(record)].add(time, record) else case @parse_error_action when :exception raise FailedParseError.new "pattern not match: #{line.inspect}"