lib/fluent/plugin/out_logentries.rb in fluent-plugin-logentries-0.1.1 vs lib/fluent/plugin/out_logentries.rb in fluent-plugin-logentries-0.2.0

- old
+ new

@@ -1,16 +1,21 @@ require 'socket' +require 'yaml' +require 'openssl' class LogentriesOutput < Fluent::BufferedOutput class ConnectionFailure < StandardError; end # First, register the plugin. NAME is the name of this plugin # and identifies the plugin in the configuration file. Fluent::Plugin.register_output('logentries', self) - config_param :host, :string - config_param :port, :integer, :default => 80 - config_param :path, :string + config_param :use_ssl, :bool, :default => true + config_param :no_ssl_port, :integer, :default => 80 + config_param :config_path, :string + config_param :max_retries, :integer, :default => 3 + config_param :tag_access_log, :string, :default => 'logs-access' + config_param :tag_error_log, :string, :default => 'logs-error' def configure(conf) super end @@ -21,70 +26,90 @@ def shutdown super end def client - @_socket ||= TCPSocket.new @host, @port + @_socket ||= if @use_ssl + context = OpenSSL::SSL::SSLContext.new + socket = TCPSocket.new "api.logentries.com", 20000 + ssl_client = OpenSSL::SSL::SSLSocket.new socket, context + + ssl_client.connect + else + TCPSocket.new "data.logentries.com", @no_ssl_port + end end # This method is called when an event reaches Fluentd. def format(tag, time, record) return [tag, record].to_msgpack end - # Scan a given directory for logentries tokens - def generate_token(path) - tokens = {} - - Dir[path + "*.token"].each do |file| - key = File.basename(file, ".token") #remove path/extension from filename - #read the first line, remove unwanted char and close it - tokens[key] = File.open(file, &:readline).gsub(/\r\n|\r|\n/, '') + # Create tokens hash + def generate_token + begin + YAML::load_file(@config_path) + rescue Exception => e + log.warn "Could not load configuration. #{e.message}" end - - tokens end - # returns the correct token to use for a given tag / Records + # Returns the correct token to use for a given tag / records def get_token(tag, record, tokens) - tag ||= "" - record ||= "" + tag ||= "" + message = record["message"] + # Config Structure + # ----------------------- + # app-name: + # app: TOKEN + # access: TOKEN (optional) + # error: TOKEN (optional) tokens.each do |key, value| - if tag.index(key) != nil || record.index(key) != nil then - return value + if tag.index(key) != nil || message.index(key) != nil then + default = value['app'] + + case tag + when @tag_access_log + return value['access'] || default + when @tag_error_log + return value['error'] || default + + else + return default + end end end return nil end # NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins. def write(chunk) - tokens = generate_token(@path) + tokens = generate_token() + return unless tokens.is_a? Hash chunk.msgpack_each do |tag, record| next unless record.is_a? Hash + next unless record.has_key? "message" token = get_token(tag, record, tokens) next if token.nil? - if record.has_key?("message") - send_logentries(record["message"] + ' ' + token) - end + send_logentries(token + ' ' + record["message"]) end end def send_logentries(data) retries = 0 begin client.puts data rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT => e - if retries < 2 + if retries < @max_retries retries += 1 @_socket = nil log.warn "Could not push logs to Logentries, resetting connection and trying again. #{e.message}" - sleep 2**retries + sleep 5**retries retry end raise ConnectionFailure, "Could not push logs to Logentries after #{retries} retries. #{e.message}" end end