lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.5.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.6.0

- old
+ new

@@ -29,16 +29,20 @@ config_param :max_messages, :integer, :default => 1000 desc 'Publishing messages bytesize per request to Cloud Pub/Sub.' config_param :max_total_size, :integer, :default => 9800000 # 9.8MB desc 'Limit bytesize per message.' config_param :max_message_size, :integer, :default => 4000000 # 4MB - desc 'Publishing the set field as an attribute' + desc 'Publishing the set field as an attribute created from input message' config_param :attribute_keys, :array, :default => [] + desc 'Publishing the set field as an attribute created from input config params' + config_param :attribute_key_values, :hash, :default => {} desc 'Set service endpoint' config_param :endpoint, :string, :default => nil desc 'Compress messages' config_param :compression, :string, :default => nil + desc 'Set default timeout to use in publish requests' + config_param :timeout, :integer, :default => nil config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE end @@ -58,18 +62,19 @@ end end def start super - @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint + @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint, @timeout end def format(tag, time, record) record = inject_values_to_record(tag, time, record) attributes = {} @attribute_keys.each do |key| attributes[key] = record.delete(key) end + attributes.merge! @attribute_key_values [@compress.call(@formatter.format(tag, time, record)), attributes].to_msgpack end def formatted_to_msgpack_binary? true