lib/fluent/plugin/out_sns.rb in fluent-plugin-sns-2.0.2 vs lib/fluent/plugin/out_sns.rb in fluent-plugin-sns-2.1.0
- old
+ new
@@ -1,64 +1,90 @@
module Fluent
-
- require 'aws-sdk'
- class SNSOutput < Output
-
- Fluent::Plugin.register_output('sns', self)
-
- include SetTagKeyMixin
- config_set_default :include_tag_key, false
-
- include SetTimeKeyMixin
- config_set_default :include_time_key, true
-
- config_param :aws_key_id, :string, :default => nil
- config_param :aws_sec_key, :string, :default => nil
-
- config_param :sns_topic_name, :string
- config_param :sns_subject_key, :string, :default => nil
- config_param :sns_subject, :string, :default => nil
- config_param :sns_endpoint, :string, :default => 'sns.ap-northeast-1.amazonaws.com'
- config_param :proxy, :string, :default => ENV['HTTP_PROXY']
-
- def configure(conf)
- super
- end
-
- def start
- super
- options = {}
- options[:sns_endpoint] = @sns_endpoint
- options[:proxy_uri] = @proxy
- if @aws_key_id && @aws_sec_key
- options[:access_key_id] = @aws_key_id
- options[:secret_access_key] = @aws_sec_key
- end
- AWS.config(options)
-
- @sns = AWS::SNS.new
- @topic = get_topic
- end
-
- def shutdown
- super
- end
-
- def emit(tag, es, chain)
- chain.next
- es.each {|time,record|
- record["time"] = Time.at(time).localtime
- subject = record[@sns_subject_key] || @sns_subject || 'Fluent-Notification'
- @topic.publish(record.to_json, :subject => subject )
- }
- end
-
- def get_topic()
- @sns.topics.each do |topic|
- if @sns_topic_name == topic.name
- return topic
- end
- end
- end
+ require 'aws-sdk'
+
+ class SNSOutput < Output
+
+ Fluent::Plugin.register_output('sns', self)
+
+ include SetTagKeyMixin
+ config_set_default :include_tag_key, false
+
+ include SetTimeKeyMixin
+ config_set_default :include_time_key, true
+
+ config_param :aws_key_id, :string, :default => ENV['AWS_ACCESS_KEY_ID']
+ config_param :aws_sec_key, :string, :default => ENV['AWS_SECRET_ACCESS_KEY']
+
+ config_param :sns_topic_name, :string
+ config_param :sns_subject_template, :default => nil
+ config_param :sns_subject_key, :string, :default => nil
+ config_param :sns_subject, :string, :default => nil
+ config_param :sns_body_template, :default => nil
+ config_param :sns_body_key, :string, :default => nil
+ config_param :sns_body, :string, :default => nil
+ config_param :sns_endpoint, :string, :default => 'sns.ap-northeast-1.amazonaws.com'
+ config_param :proxy, :string, :default => ENV['HTTP_PROXY']
+
+ def configure(conf)
+ super
end
+
+ def start
+ super
+ options = {}
+ options[:sns_endpoint] = @sns_endpoint
+ options[:proxy_uri] = @proxy
+ if @aws_key_id && @aws_sec_key
+ options[:access_key_id] = @aws_key_id
+ options[:secret_access_key] = @aws_sec_key
+ end
+ AWS.config(options)
+
+ @sns = AWS::SNS.new
+ @topic = @sns.topics.find{|topic| @sns_topic_name == topic.name}
+
+ @subject_template = nil
+ unless @sns_body_template.nil?
+ template_file = open(@sns_subject_template)
+ @subject_template = ERB.new(template_file.read)
+ template_file.close
+ end
+
+ @body_template = nil
+ unless @sns_body_template.nil?
+ template_file = open(@sns_body_template)
+ @body_template = ERB.new(template_file.read)
+ template_file.close
+ end
+ end
+
+ def shutdown
+ super
+ end
+
+ def emit(tag, es, chain)
+ chain.next
+ es.each {|time,record|
+ record['time'] = Time.at(time).localtime
+ body = get_body(record).force_encoding('UTF-8')
+ subject = get_subject(record).force_encoding('UTF-8').gsub(/(\r\n|\r|\n)/, '')
+ puts "subject:#{subject}, body:#{body}"
+ @topic.publish( body, :subject => subject )
+ }
+ end
+
+ def get_subject(record)
+ unless @subject_template.nil?
+ return @subject_template.result(binding)
+ end
+ subject = record[@sns_subject_key].to_s || @sns_subject || 'Fluentd-Notification'
+ end
+
+ def get_body(record)
+ unless @body_template.nil?
+ return @body_template.result(binding)
+ end
+ record[@sns_body_key] || @sns_body || record.to_json
+ end
+ end
end