Sha256: 4f7e8b7e966c89caf4a3238b43d9d1150af706e7d205a77b5f7f2d2a8cca1ff0

Contents?: true

Size: 1.62 KB

Versions: 1

Compression:

Stored size: 1.62 KB

Contents

module Fluent
  class S3InputOutput < Output

    Fluent::Plugin.register_output('s3_input', self)

    # Define `router` method of v0.12 to support v0.10 or earlier
    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    config_param :aws_key_id, :string, :default => ENV['AWS_ACCESS_KEY_ID'], :secret => true
    config_param :aws_sec_key, :string, :default => ENV['AWS_SECRET_ACCESS_KEY'], :secret => true
    config_param :s3_bucket_key
    config_param :s3_object_key_key
    config_param :tag
    # supports: gzip
    config_param :uncompress, :string

    attr_accessor :s3

    def initialize
      super
      require 'net/http'
      require 'oj'
      require 'aws-sdk'
    end

    def configure(conf)
      super
      if @aws_key_id and @aws_sec_key
        @s3 = Aws::S3::Client.new(
          region: "us-east-1",
          access_key_id: @aws_key_id,
          secret_access_key: @aws_sec_key,
        )
      else
        @s3 = Aws::S3::Client.new(region: "us-east-1")
      end
    end

    def emit(tag, es, chain)
      tag_parts = tag.split('.')
      es.each { |time, record|
        s3_bucket = record[s3_bucket_key]
        s3_key = record[s3_object_key_key]
        resp = s3.get_object(bucket: s3_bucket, key: s3_key) 
        if @uncompress && @uncompress == "gzip"
          input = Zlib::GzipReader.new(resp.body)
        else
          input = resp.body
        end
        new_record = Oj.load(input.read)
        router.emit(@tag, time, new_record)
      }
      chain.next
    rescue => e
      $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-s3-input-0.0.1 lib/fluent/plugin/out_s3_input.rb