lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.4.0 vs lib/fluent/plugin/in_gcloud_pubsub.rb in fluent-plugin-gcloud-pubsub-custom-1.5.0

- old
+ new

@@ -1,15 +1,18 @@ require 'json' require 'webrick' +require 'fluent/plugin/compressable' require 'fluent/plugin/input' require 'fluent/plugin/parser' require 'fluent/plugin/gcloud_pubsub/client' module Fluent::Plugin class GcloudPubSubInput < Input + include Fluent::Plugin::Compressable + Fluent::Plugin.register_input('gcloud_pubsub', self) helpers :compat_parameters, :parser, :thread DEFAULT_PARSER_TYPE = 'json' @@ -46,10 +49,12 @@ config_param :enable_rpc, :bool, default: false desc 'Bind IP address for HTTP RPC.' config_param :rpc_bind, :string, default: '0.0.0.0' desc 'Port for HTTP RPC.' config_param :rpc_port, :integer, default: 24680 + desc 'Decompress messages' + config_param :decompression, :string, default: nil config_section :parse do config_set_default :@type, DEFAULT_PARSER_TYPE end @@ -112,10 +117,15 @@ else method(:dynamic_tag) end @parser = parser_create + @decompress = if @decompression == 'gzip' + method(:gzip_decompress) + else + method(:no_decompress) + end end def start super start_rpc if @enable_rpc @@ -166,10 +176,18 @@ def dynamic_tag(record) record.delete(@tag_key) || @tag end + def gzip_decompress(message) + decompress message + end + + def no_decompress(message) + message + end + def start_rpc log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/" @rpc_srv = WEBrick::HTTPServer.new( { BindAddress: @rpc_bind, @@ -219,10 +237,10 @@ event_streams = Hash.new do |hsh, key| hsh[key] = Fluent::MultiEventStream.new end messages.each do |m| - line = m.message.data.chomp + line = @decompress.call(m.message.data).chomp attributes = m.attributes @parser.parse(line) do |time, record| if time && record @attribute_keys.each do |key| record[key] = attributes[key]