lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.2.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.3.0

- old
+ new

@@ -25,10 +25,12 @@ config_param :max_messages, :integer, :default => 1000 desc 'Publishing messages bytesize per request to Cloud Pub/Sub.' config_param :max_total_size, :integer, :default => 9800000 # 9.8MB desc 'Limit bytesize per message.' config_param :max_message_size, :integer, :default => 4000000 # 4MB + desc 'Publishing the set field as an attribute' + config_param :attribute_keys, :array, :default => [] config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE end @@ -48,11 +50,15 @@ @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic end def format(tag, time, record) record = inject_values_to_record(tag, time, record) - @formatter.format(tag, time, record).to_msgpack + attributes = {} + @attribute_keys.each do |key| + attributes[key] = record.delete(key) + end + [@formatter.format(tag, time, record), attributes].to_msgpack end def formatted_to_msgpack_binary? true end @@ -65,10 +71,11 @@ topic = extract_placeholders(@topic, chunk.metadata) messages = [] size = 0 - chunk.msgpack_each do |msg| + chunk.msgpack_each do |msg, attr| + msg = Fluent::GcloudPubSub::Message.new(msg, attr) if msg.bytesize > @max_message_size log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size