lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.4.0 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.5.0
- old
+ new
@@ -1,15 +1,18 @@
require 'json'
require 'webrick'
+require 'fluent/plugin/compressable'
require 'fluent/plugin/input'
require 'fluent/plugin/parser'
require 'fluent/plugin/gcloud_pubsub/client'
module Fluent::Plugin
class GcloudPubSubInput < Input
+ include Fluent::Plugin::Compressable
+
Fluent::Plugin.register_input('gcloud_pubsub', self)
helpers :compat_parameters, :parser, :thread
DEFAULT_PARSER_TYPE = 'json'
@@ -46,10 +49,12 @@
config_param :enable_rpc, :bool, default: false
desc 'Bind IP address for HTTP RPC.'
config_param :rpc_bind, :string, default: '0.0.0.0'
desc 'Port for HTTP RPC.'
config_param :rpc_port, :integer, default: 24680
+ desc 'Decompress messages'
+ config_param :decompression, :string, default: nil
config_section :parse do
config_set_default :@type, DEFAULT_PARSER_TYPE
end
@@ -112,10 +117,15 @@
else
method(:dynamic_tag)
end
@parser = parser_create
+ @decompress = if @decompression == 'gzip'
+ method(:gzip_decompress)
+ else
+ method(:no_decompress)
+ end
end
def start
super
start_rpc if @enable_rpc
@@ -166,10 +176,18 @@
def dynamic_tag(record)
record.delete(@tag_key) || @tag
end
+ def gzip_decompress(message)
+ decompress message
+ end
+
+ def no_decompress(message)
+ message
+ end
+
def start_rpc
log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/"
@rpc_srv = WEBrick::HTTPServer.new(
{
BindAddress: @rpc_bind,
@@ -219,10 +237,10 @@
event_streams = Hash.new do |hsh, key|
hsh[key] = Fluent::MultiEventStream.new
end
messages.each do |m|
- line = m.message.data.chomp
+ line = @decompress.call(m.message.data).chomp
attributes = m.attributes
@parser.parse(line) do |time, record|
if time && record
@attribute_keys.each do |key|
record[key] = attributes[key]