lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.4.0 vs lib/fluent/plugin/out_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.5.0
- old
+ new
@@ -1,11 +1,13 @@
+require 'fluent/plugin/compressable'
require 'fluent/plugin/output'
require 'fluent/plugin/gcloud_pubsub/client'
require 'fluent/plugin_helper/inject'
module Fluent::Plugin
class GcloudPubSubOutput < Output
+ include Fluent::Plugin::Compressable
include Fluent::PluginHelper::Inject
Fluent::Plugin.register_output('gcloud_pubsub', self)
helpers :compat_parameters, :formatter
@@ -29,10 +31,14 @@
config_param :max_total_size, :integer, :default => 9800000 # 9.8MB
desc 'Limit bytesize per message.'
config_param :max_message_size, :integer, :default => 4000000 # 4MB
desc 'Publishing the set field as an attribute'
config_param :attribute_keys, :array, :default => []
+ desc 'Set service endpoint'
+ config_param :endpoint, :string, :default => nil
+ desc 'Compress messages'
+ config_param :compression, :string, :default => nil
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
end
@@ -43,24 +49,29 @@
def configure(conf)
compat_parameters_convert(conf, :buffer, :formatter)
super
placeholder_validate!(:topic, @topic)
@formatter = formatter_create
+ @compress = if @compression == 'gzip'
+ method(:gzip_compress)
+ else
+ method(:no_compress)
+ end
end
def start
super
- @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project
+ @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @dest_project, @endpoint
end
def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
attributes = {}
@attribute_keys.each do |key|
attributes[key] = record.delete(key)
end
- [@formatter.format(tag, time, record), attributes].to_msgpack
+ [@compress.call(@formatter.format(tag, time, record)), attributes].to_msgpack
end
def formatted_to_msgpack_binary?
true
end
@@ -105,8 +116,16 @@
private
def publish(topic, messages)
log.debug "send message topic:#{topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}"
@publisher.publish(topic, messages)
+ end
+
+ def gzip_compress(message)
+ compress message
+ end
+
+ def no_compress(message)
+ message
end
end
end