Sha256: 0022f9921768bbac90ca30f7a2150207f453e78c9e354df0bae0bcfe2907cce0

Contents?: true

Size: 1.67 KB

Versions: 1

Compression:

Stored size: 1.67 KB

Contents

require 'fluent/plugin/input'

module Fluent::Plugin
  require 'aws-sdk'

  class SQSPollInput < Input
    Fluent::Plugin.register_input('sqs_poll', self)

    config_param :aws_access_key, :string, :default => nil, :secret => true
    config_param :aws_secret_key, :string, :default => nil, :secret => true
    config_param :tag, :string
    config_param :sqs_url, :string
    config_param :max_number_of_messages, :integer, default: 1

    def configure(conf)
      super
    end

    def start
      super

      @terminate = false
      @thread = Thread.new(&method(:poll))
    end

    def shutdown
      super

      @terminate = true
      @thread.join
    end

    def poll
      region = @sqs_url.split('.')[1]
      Aws.config.update(region: region)

      if @aws_access_key && @aws_secret_key
        Aws.config.update(
          credentials: Aws::Credentials.new(@aws_access_key, @aws_secret_key)
        )
      end

      poller = Aws::SQS::QueuePoller.new(@sqs_url)

      poller.poll(max_number_of_messages: @max_number_of_messages) do |messages|
        messages.each do |msg|
          begin
            router.emit(@tag, Time.now.to_i,
              {
                'body' => msg.body,
                'handle' => msg.receipt_handle,
                'id' => msg.message_id,
                'md5' => msg.md5_of_body,
                'sqs_receive_count' => msg.attributes['ApproximateReceiveCount'],
              }
            )
          rescue Exception => e
            $log.error("SQS exception", error: e.to_s, error_class: e.class.to_s)
            $log.warn_backtrace(e.backtrace)
          end
        end
        throw :stop_polling if @terminate
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sqs-poll-0.2.1 lib/fluent/plugin/in_sqs_poll.rb