lib/fluent/plugin/out_logentries.rb in fluent-plugin-logentries-0.1.1 vs lib/fluent/plugin/out_logentries.rb in fluent-plugin-logentries-0.2.0
- old
+ new
@@ -1,16 +1,21 @@
require 'socket'
+require 'yaml'
+require 'openssl'
class LogentriesOutput < Fluent::BufferedOutput
class ConnectionFailure < StandardError; end
# First, register the plugin. NAME is the name of this plugin
# and identifies the plugin in the configuration file.
Fluent::Plugin.register_output('logentries', self)
- config_param :host, :string
- config_param :port, :integer, :default => 80
- config_param :path, :string
+ config_param :use_ssl, :bool, :default => true
+ config_param :no_ssl_port, :integer, :default => 80
+ config_param :config_path, :string
+ config_param :max_retries, :integer, :default => 3
+ config_param :tag_access_log, :string, :default => 'logs-access'
+ config_param :tag_error_log, :string, :default => 'logs-error'
def configure(conf)
super
end
@@ -21,70 +26,90 @@
def shutdown
super
end
def client
- @_socket ||= TCPSocket.new @host, @port
+ @_socket ||= if @use_ssl
+ context = OpenSSL::SSL::SSLContext.new
+ socket = TCPSocket.new "api.logentries.com", 20000
+ ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
+
+ ssl_client.connect
+ else
+ TCPSocket.new "data.logentries.com", @no_ssl_port
+ end
end
# This method is called when an event reaches Fluentd.
def format(tag, time, record)
return [tag, record].to_msgpack
end
- # Scan a given directory for logentries tokens
- def generate_token(path)
- tokens = {}
-
- Dir[path + "*.token"].each do |file|
- key = File.basename(file, ".token") #remove path/extension from filename
- #read the first line, remove unwanted char and close it
- tokens[key] = File.open(file, &:readline).gsub(/\r\n|\r|\n/, '')
+ # Create tokens hash
+ def generate_token
+ begin
+ YAML::load_file(@config_path)
+ rescue Exception => e
+ log.warn "Could not load configuration. #{e.message}"
end
-
- tokens
end
- # returns the correct token to use for a given tag / Records
+ # Returns the correct token to use for a given tag / records
def get_token(tag, record, tokens)
- tag ||= ""
- record ||= ""
+ tag ||= ""
+ message = record["message"]
+ # Config Structure
+ # -----------------------
+ # app-name:
+ # app: TOKEN
+ # access: TOKEN (optional)
+ # error: TOKEN (optional)
tokens.each do |key, value|
- if tag.index(key) != nil || record.index(key) != nil then
- return value
+ if tag.index(key) != nil || message.index(key) != nil then
+ default = value['app']
+
+ case tag
+ when @tag_access_log
+ return value['access'] || default
+ when @tag_error_log
+ return value['error'] || default
+
+ else
+ return default
+ end
end
end
return nil
end
# NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
def write(chunk)
- tokens = generate_token(@path)
+ tokens = generate_token()
+ return unless tokens.is_a? Hash
chunk.msgpack_each do |tag, record|
next unless record.is_a? Hash
+ next unless record.has_key? "message"
token = get_token(tag, record, tokens)
next if token.nil?
- if record.has_key?("message")
- send_logentries(record["message"] + ' ' + token)
- end
+ send_logentries(token + ' ' + record["message"])
end
end
def send_logentries(data)
retries = 0
begin
client.puts data
rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT => e
- if retries < 2
+ if retries < @max_retries
retries += 1
@_socket = nil
log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}"
- sleep 2**retries
+ sleep 5**retries
retry
end
raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}"
end
end