Sha256: 6dc4a302a7b8c55448ecc6595b66f30fe942b62701eac449bbfc9230c28c32fc

Contents?: true

Size: 798 Bytes

Versions: 3

Compression:

Stored size: 798 Bytes

Contents

module Fluent::Plugin
  class WebHDFSOutput < Output
    class HadoopSnappyCompressor < Compressor
      WebHDFSOutput.register_compressor('hadoop_snappy', self)

      DEFAULT_BLOCK_SIZE = 256 * 1024

      desc 'Block size for compression algorithm'
      config_param :block_size, :integer, default: DEFAULT_BLOCK_SIZE

      def initialize(options = {})
        super()
        begin
          require "snappy"
        rescue LoadError
          raise Fluent::ConfigError, "Install snappy before using snappy compressor"
        end
      end

      def ext
        ".snappy"
      end

      def compress(chunk, tmp)
        Snappy::Hadoop::Writer.new(tmp, @block_size) do |w|
          w << chunk.read
          w.flush
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fluent-plugin-webhdfs-1.6.0 lib/fluent/plugin/webhdfs_compressor_hadoop_snappy.rb
fluent-plugin-webhdfs-1.5.0 lib/fluent/plugin/webhdfs_compressor_hadoop_snappy.rb
fluent-plugin-webhdfs-1.4.0 lib/fluent/plugin/webhdfs_compressor_hadoop_snappy.rb