lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.4.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.5.0

- old
+ new

@@ -1,11 +1,13 @@ +require 'fluent/plugin/compressable' require 'fluent/plugin/output' require 'fluent/plugin/gcloud_pubsub/client' require 'fluent/plugin_helper/inject' module Fluent::Plugin class GcloudPubSubOutput < Output + include Fluent::Plugin::Compressable include Fluent::PluginHelper::Inject Fluent::Plugin.register_output('gcloud_pubsub', self) helpers :compat_parameters, :formatter @@ -29,10 +31,14 @@ config_param :max_total_size, :integer, :default => 9800000 # 9.8MB desc 'Limit bytesize per message.' config_param :max_message_size, :integer, :default => 4000000 # 4MB desc 'Publishing the set field as an attribute' config_param :attribute_keys, :array, :default => [] + desc 'Set service endpoint' + config_param :endpoint, :string, :default => nil + desc 'Compress messages' + config_param :compression, :string, :default => nil config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE end @@ -43,24 +49,29 @@ def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super placeholder_validate!(:topic, @topic) @formatter = formatter_create + @compress = if @compression == 'gzip' + method(:gzip_compress) + else + method(:no_compress) + end end def start super - @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project + @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint end def format(tag, time, record) record = inject_values_to_record(tag, time, record) attributes = {} @attribute_keys.each do |key| attributes[key] = record.delete(key) end - [@formatter.format(tag, time, record), attributes].to_msgpack + [@compress.call(@formatter.format(tag, time, record)), attributes].to_msgpack end def formatted_to_msgpack_binary? true end @@ -105,8 +116,16 @@ private def publish(topic, messages) log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" @publisher.publish(topic, messages) + end + + def gzip_compress(message) + compress message + end + + def no_compress(message) + message end end end