Sha256: 3fb9abd588f095b10c881d43815fc245ebc32e098e1e4df2d32fb61e39285aac

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

module Fluent

  require 'aws-sdk'

  class SQSInput < Input
    Plugin.register_input('sqs', self)

    def initialize
      super
    end

    config_param :aws_key_id, :string, :default => nil
    config_param :aws_sec_key, :string, :default => nil
    config_param :tag, :string
    config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com'
    config_param :sqs_url, :string
    config_param :receive_interval, :time, :default => 1

    def configure(conf)
      super

    end

    def start
      super

      AWS.config(
        :access_key_id => @aws_key_id,
        :secret_access_key => @aws_sec_key
        )

      @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url]

      @finished = false
      @thread = Thread.new(&method(:run_periodic))
    end

    def shutdown
      super

      @finished = true
      @thread.join
    end

    def run_periodic
      until @finished
        begin
          sleep @receive_interval
          @queue.receive_message do |message|
            record = {}
            record['body'] = message.body.to_s
            record['handle'] = message.handle.to_s
            record['id'] = message.id.to_s
            record['md5'] = message.md5.to_s
            record['url'] = message.queue.url.to_s
            record['sender_id'] = message.sender_id.to_s

            Engine.emit(@tag, Time.now.to_i, record)
          end
        rescue
          $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s
          $log.warn_backtrace $!.backtrace
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sqs-1.4.4 lib/fluent/plugin/in_sqs.rb