Sha256: 1fd4571a9c20d1f5fbc37561589b8251d9484fd5f60e9465b2b6c81b27fd94ad
Contents?: true
Size: 1.84 KB
Versions: 1
Compression:
Stored size: 1.84 KB
Contents
module Fluent require 'aws-sdk' class SQSInput < Input Plugin.register_input('sqs', self) def initialize super end config_param :aws_key_id, :string, :default => nil, :secret => true config_param :aws_sec_key, :string, :default => nil, :secret => true config_param :tag, :string config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com' config_param :sqs_url, :string config_param :receive_interval, :time, :default => 0.1 config_param :max_number_of_messages, :integer, :default => 10 config_param :wait_time_seconds, :integer, :default => 10 def configure(conf) super end def start super AWS.config( :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key ) @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url] @finished = false @thread = Thread.new(&method(:run_periodic)) end def shutdown super @finished = true @thread.join end def run_periodic until @finished begin sleep @receive_interval @queue.receive_message( :limit => @max_number_of_messages, :wait_time_seconds => @wait_time_seconds ) do |message| record = {} record['body'] = message.body.to_s record['handle'] = message.handle.to_s record['id'] = message.id.to_s record['md5'] = message.md5.to_s record['url'] = message.queue.url.to_s record['sender_id'] = message.sender_id.to_s Engine.emit(@tag, Time.now.to_i, record) end rescue $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s $log.warn_backtrace $!.backtrace end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-sqs-1.6.2 | lib/fluent/plugin/in_sqs.rb |