Sha256: f7f6479eb6802b23d852de802afba9b483a6cfec6991151768a5b5772911361b

Contents?: true

Size: 1.58 KB

Versions: 4

Compression:

Stored size: 1.58 KB

Contents

module Fluent

    require 'aws-sdk'

    class SQSOutput < BufferedOutput

        Fluent::Plugin.register_output('sqs', self)

        include SetTagKeyMixin
        config_set_default :include_tag_key, false

        include SetTimeKeyMixin
        config_set_default :include_time_key, true

        config_param :aws_key_id, :string
        config_param :aws_sec_key, :string
        config_param :queue_name, :string
        config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com'
        config_param :delay_seconds, :integer, :default => 0
        #config_param :buffer_queue_limit, :integer, :default => 10
        
        def configure(conf)
            super
        end

        def start
            super
            
            AWS.config(
                :access_key_id => @aws_key_id,
                :secret_access_key => @aws_sec_key)

            @sqs = AWS::SQS.new(
                :sqs_endpoint => @sqs_endpoint)
            @queue = @sqs.queues.create(@queue_name)
            
        end

        def shutdown
            super
        end
        
        def format(tag, time, record)
            record.to_msgpack
        end
        
        def write(chunk)
            records = []
            chunk.msgpack_each {|record| records << { :message_body => record.to_json, :delay_seconds => @delay_seconds } }
            until records.length <= 0 do
                begin
                    @queue.batch_send(records.slice!(0..9))
                rescue => e
                    $stderr.puts e
                end
            end
        end
    end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
fluent-plugin-sqs-1.2.2 lib/fluent/plugin/out_sqs.rb
fluent-plugin-sqs-1.2.1 lib/fluent/plugin/out_sqs.rb
fluent-plugin-sqs-1.2.0 lib/fluent/plugin/out_sqs.rb
fluent-plugin-sqs-1.1.0 lib/fluent/plugin/out_sqs.rb