Sha256: d8e9e09798efc9450543c11c1da9735e33f6e91021c5255bc15db3f1299f4394

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

module Fluent

  require 'aws-sdk'

  class SQSInput < Input
    Plugin.register_input('sqs', self)

    def initialize
      super
    end

    config_param :aws_key_id, :string, :default => nil
    config_param :aws_sec_key, :string, :default => nil
    config_param :tag, :string
    config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com'
    config_param :sqs_url, :string
    config_param :receive_interval, :time, :default => 1

    def configure(conf)
      super

    end

    def start
      super

      AWS.config(
        :access_key_id => @aws_key_id,
        :secret_access_key => @aws_sec_key
        )

      @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url]

      @finished = false
      @thread = Thread.new(&method(:run_periodic))
    end

    def shutdown
      super

      @finished = true
      @thread.join
    end

    def run_periodic
      until @finished
        begin
          sleep @receive_interval
          @queue.receive_message do |message|
            record = {}
            record[:body] = message.body.to_s
            record[:handle] = message.handle.to_s
            record[:id] = message.id.to_s
            record[:md5] = message.md5.to_s
            record[:url] = message.queue.url.to_s
            record[:sender_id] = message.sender_id.to_s

            Engine.emit(@tag, Time.now.to_i, record)
          end
        rescue
          $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s
          $log.warn_backtrace $!.backtrace
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sqs-1.3.2 lib/fluent/plugin/in_sqs.rb