Sha256: 571513bfbe2adf144c26a595c1f7d4a091a4b7e6fd92b165587cdfe75f3633d1
Contents?: true
Size: 1.58 KB
Versions: 2
Compression:
Stored size: 1.58 KB
Contents
module Fluent require 'aws-sdk' class SQSOutput < BufferedOutput Fluent::Plugin.register_output('sqs', self) include SetTagKeyMixin config_set_default :include_tag_key, false include SetTimeKeyMixin config_set_default :include_time_key, true config_param :aws_key_id, :string config_param :aws_sec_key, :string config_param :queue_name, :string config_param :sqs_endpoint, :string, :default => 'sqs.ap-northeast-1.amazonaws.com' config_param :delay_seconds, :integer, :default => 0 #config_param :buffer_queue_limit, :integer, :default => 10 def configure(conf) super end def start super AWS.config( :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key, :sqs_endpoint => @sqs_endpoint ) @sqs = AWS::SQS.new @queue = @sqs.queues.create(@queue_name) end def shutdown super end def format(tag, time, record) record.to_msgpack end def write(chunk) records = [] chunk.msgpack_each {|record| records << { :message_body => record.to_json, :delay_seconds => @delay_seconds } } until records.length <= 0 do begin @queue.batch_send(records.slice!(0..9)) rescue => e $stderr.puts e end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-sqs-0.2.3 | lib/fluent/plugin/out_sqs.rb |
fluent-plugin-sqs-0.2.1 | lib/fluent/plugin/out_sqs.rb |