Sha256: 0022f9921768bbac90ca30f7a2150207f453e78c9e354df0bae0bcfe2907cce0
Contents?: true
Size: 1.67 KB
Versions: 1
Compression:
Stored size: 1.67 KB
Contents
require 'fluent/plugin/input' module Fluent::Plugin require 'aws-sdk' class SQSPollInput < Input Fluent::Plugin.register_input('sqs_poll', self) config_param :aws_access_key, :string, :default => nil, :secret => true config_param :aws_secret_key, :string, :default => nil, :secret => true config_param :tag, :string config_param :sqs_url, :string config_param :max_number_of_messages, :integer, default: 1 def configure(conf) super end def start super @terminate = false @thread = Thread.new(&method(:poll)) end def shutdown super @terminate = true @thread.join end def poll region = @sqs_url.split('.')[1] Aws.config.update(region: region) if @aws_access_key && @aws_secret_key Aws.config.update( credentials: Aws::Credentials.new(@aws_access_key, @aws_secret_key) ) end poller = Aws::SQS::QueuePoller.new(@sqs_url) poller.poll(max_number_of_messages: @max_number_of_messages) do |messages| messages.each do |msg| begin router.emit(@tag, Time.now.to_i, { 'body' => msg.body, 'handle' => msg.receipt_handle, 'id' => msg.message_id, 'md5' => msg.md5_of_body, 'sqs_receive_count' => msg.attributes['ApproximateReceiveCount'], } ) rescue Exception => e $log.error("SQS exception", error: e.to_s, error_class: e.class.to_s) $log.warn_backtrace(e.backtrace) end end throw :stop_polling if @terminate end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-sqs-poll-0.2.1 | lib/fluent/plugin/in_sqs_poll.rb |