lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.5.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.6.0
- old
+ new
@@ -29,16 +29,20 @@
config_param :max_messages, :integer, :default => 1000
desc 'Publishing messages bytesize per request to Cloud Pub/Sub.'
config_param :max_total_size, :integer, :default => 9800000 # 9.8MB
desc 'Limit bytesize per message.'
config_param :max_message_size, :integer, :default => 4000000 # 4MB
- desc 'Publishing the set field as an attribute'
+ desc 'Publishing the set field as an attribute created from input message'
config_param :attribute_keys, :array, :default => []
+ desc 'Publishing the set field as an attribute created from input config params'
+ config_param :attribute_key_values, :hash, :default => {}
desc 'Set service endpoint'
config_param :endpoint, :string, :default => nil
desc 'Compress messages'
config_param :compression, :string, :default => nil
+ desc 'Set default timeout to use in publish requests'
+ config_param :timeout, :integer, :default => nil
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
end
@@ -58,18 +62,19 @@
end
end
def start
super
- @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint
+ @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint, @timeout
end
def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
attributes = {}
@attribute_keys.each do |key|
attributes[key] = record.delete(key)
end
+ attributes.merge! @attribute_key_values
[@compress.call(@formatter.format(tag, time, record)), attributes].to_msgpack
end
def formatted_to_msgpack_binary?
true