Sha256: e10aa938b005f345b6d6e4629d92e5392a199579b406ed306b8e68afabb1c9e7

Contents?: true

Size: 1.79 KB

Versions: 3

Compression:

Stored size: 1.79 KB

Contents

module Fluent

    require 'aws-sdk'

    class SQSOutput < BufferedOutput

        Fluent::Plugin.register_output('sqs', self)

        include SetTagKeyMixin
        config_set_default :include_tag_key, false

        include SetTimeKeyMixin
        config_set_default :include_time_key, true

        config_param :aws_key_id, :string, :default => nil
        config_param :aws_sec_key, :string, :default => nil
        config_param :queue_name, :string
        config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com'
        config_param :delay_seconds, :integer, :default => 0
        config_param :include_tag, :bool, :default => true
        config_param :tag_property_name, :string, :default => '__tag'
        #config_param :buffer_queue_limit, :integer, :default => 10

        def configure(conf)
            super
        end

        def start
            super

            AWS.config(
                :access_key_id => @aws_key_id,
                :secret_access_key => @aws_sec_key)

            @sqs = AWS::SQS.new(
                :sqs_endpoint => @sqs_endpoint)
            @queue = @sqs.queues.create(@queue_name)

        end

        def shutdown
            super
        end

        def format(tag, time, record)
            if @include_tag then
                record[@tag_property_name] = tag
            end

            record.to_msgpack
        end

        def write(chunk)
            records = []
            chunk.msgpack_each {|record| records << { :message_body => record.to_json, :delay_seconds => @delay_seconds } }
            until records.length <= 0 do
                begin
                    @queue.batch_send(records.slice!(0..9))
                rescue => e
                    $stderr.puts e
                end
            end
        end
    end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fluent-plugin-sqs-1.5.0 lib/fluent/plugin/out_sqs.rb
fluent-plugin-sqs-1.4.4 lib/fluent/plugin/out_sqs.rb
fluent-plugin-sqs-1.3.2 lib/fluent/plugin/out_sqs.rb