Sha256: 12e8030ec046d7ef4c215b06bfec7ce7b725b8e31fa65829d95dfa42943e02a3
Contents?: true
Size: 1.71 KB
Versions: 3
Compression:
Stored size: 1.71 KB
Contents
module Fluent require 'aws-sdk' class SQSInput < Input Plugin.register_input('sqs', self) def initialize super end config_param :aws_key_id, :string, :default => nil, :secret => true config_param :aws_sec_key, :string, :default => nil, :secret => true config_param :tag, :string config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com' config_param :sqs_url, :string config_param :receive_interval, :time, :default => 1 config_param :max_number_of_messages, :integer, :default => 1 def configure(conf) super end def start super AWS.config( :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key ) @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url] @finished = false @thread = Thread.new(&method(:run_periodic)) end def shutdown super @finished = true @thread.join end def run_periodic until @finished begin sleep @receive_interval @queue.receive_message(:limit => @max_number_of_messages) do |message| record = {} record['body'] = message.body.to_s record['handle'] = message.handle.to_s record['id'] = message.id.to_s record['md5'] = message.md5.to_s record['url'] = message.queue.url.to_s record['sender_id'] = message.sender_id.to_s Engine.emit(@tag, Time.now.to_i, record) end rescue $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s $log.warn_backtrace $!.backtrace end end end end end
Version data entries
3 entries across 3 versions & 2 rubygems
Version | Path |
---|---|
fluent-plugin-sqs-1.6.1 | lib/fluent/plugin/in_sqs.rb |
fluent-plugin-sqs-check-size-0.1.0 | lib/fluent/plugin/in_sqs.rb |
fluent-plugin-sqs-1.6.0 | lib/fluent/plugin/in_sqs.rb |