lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.2.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.3.0
- old
+ new
@@ -25,10 +25,12 @@
config_param :max_messages, :integer, :default => 1000
desc 'Publishing messages bytesize per request to Cloud Pub/Sub.'
config_param :max_total_size, :integer, :default => 9800000 # 9.8MB
desc 'Limit bytesize per message.'
config_param :max_message_size, :integer, :default => 4000000 # 4MB
+ desc 'Publishing the set field as an attribute'
+ config_param :attribute_keys, :array, :default => []
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
end
@@ -48,11 +50,15 @@
@publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic
end
def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
- @formatter.format(tag, time, record).to_msgpack
+ attributes = {}
+ @attribute_keys.each do |key|
+ attributes[key] = record.delete(key)
+ end
+ [@formatter.format(tag, time, record), attributes].to_msgpack
end
def formatted_to_msgpack_binary?
true
end
@@ -65,10 +71,11 @@
topic = extract_placeholders(@topic, chunk.metadata)
messages = []
size = 0
- chunk.msgpack_each do |msg|
+ chunk.msgpack_each do |msg, attr|
+ msg = Fluent::GcloudPubSub::Message.new(msg, attr)
if msg.bytesize > @max_message_size
log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize
next
end
if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size