Sha256: 45c8d9fec6c498141554190f00467a35d075a9ea2284d3c080a58f2d1b02572f

Contents?: true

Size: 1.53 KB

Versions: 4

Compression:

Stored size: 1.53 KB

Contents

module Fluent

  require 'aws-sdk'

  class SQSInput < Input
    Plugin.register_input('sqs', self)

    def initialize
      super
    end

    config_param :aws_key_id, :string
    config_param :aws_sec_key, :string
    config_param :tag, :string
    config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com'
    config_param :sqs_url, :string
    config_param :receive_interval, :time, :default => 1

    def configure(conf)
      super

    end

    def start
      super

      AWS.config(
        :access_key_id => @aws_key_id,
        :secret_access_key => @aws_sec_key
        )

      @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url]

      @finished = false
      @thread = Thread.new(&method(:run_periodic))
    end

    def shutdown
      super

      @finished = true
      @thread.join
    end

    def run_periodic
      until @finished
        begin
          sleep @receive_interval
          @queue.receive_message do |message|
            record = {}
            record[:body] = message.body.to_s
            record[:handle] = message.handle.to_s
            record[:id] = message.id.to_s
            record[:md5] = message.md5.to_s
            record[:url] = message.queue.url.to_s
            record[:sender_id] = message.sender_id.to_s

            Engine.emit(@tag, Time.now.to_i, record)
          end
        rescue
          $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s
          $log.warn_backtrace $!.backtrace
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
fluent-plugin-sqs-1.3.1 lib/fluent/plugin/in_sqs.rb
fluent-plugin-sqs-1.2.2 lib/fluent/plugin/in_sqs.rb
fluent-plugin-sqs-1.2.1 lib/fluent/plugin/in_sqs.rb
fluent-plugin-sqs-1.2.0 lib/fluent/plugin/in_sqs.rb