lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.9 vs lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.10.beta
- old
+ new
@@ -6,13 +6,11 @@
require "socket" # for Socket.gethostname
require "thread" # for safe queueing
require "uri" # for escaping user input
require 'json' # for converting event object to JSON for upload
-require 'net/http'
-require 'net/http/persistent'
-require 'net/https'
+require 'manticore'
require 'rbzip2'
require 'zlib'
require 'stringio'
require 'quantile'
@@ -25,26 +23,18 @@
#---------------------------------------------------------------------------------------------------------------------
class LogStash::Outputs::Scalyr < LogStash::Outputs::Base
config_name "scalyr"
- # For correctness reasons we need to limit this plugin to a single worker, a single worker will be single concurrency
- # anyway but we should be explicit.
- concurrency :single
+ concurrency :shared
# The Scalyr API write token, these are available at https://www.scalyr.com/keys. This is the only compulsory configuration field required for proper upload
config :api_write_token, :validate => :string, :required => true
# If you have an EU-based Scalyr account, please use https://eu.scalyr.com/
config :scalyr_server, :validate => :string, :default => "https://agent.scalyr.com/"
- # Path to SSL bundle file.
- config :ssl_ca_bundle_path, :validate => :string, :default => "/etc/ssl/certs/ca-bundle.crt"
-
- # If we should append our built-in Scalyr cert to the one we find at `ssl_ca_bundle_path`.
- config :append_builtin_cert, :validate => :boolean, :default => true
-
# server_attributes is a dictionary of key value pairs that represents/identifies the logstash aggregator server
# (where this plugin is running). Keys are arbitrary except for the 'serverHost' key which holds special meaning to
# Scalyr and is given special treatment in the Scalyr UI. All of these attributes are optional (not required for logs
# to be correctly uploaded)
config :server_attributes, :validate => :hash, :default => nil
@@ -89,14 +79,19 @@
config :retry_initial_interval, :validate => :number, :default => 1
# Set max interval in seconds between bulk retries.
config :retry_max_interval, :validate => :number, :default => 64
- # The following two settings pertain to preventing Man-in-the-middle (MITM) attacks # echee TODO: eliminate?
+ # Whether or not to verify the connection to Scalyr, only set to false for debugging.
config :ssl_verify_peer, :validate => :boolean, :default => true
- config :ssl_verify_depth, :validate => :number, :default => 5
+ # Path to SSL bundle file.
+ config :ssl_ca_bundle_path, :validate => :string, :default => "/etc/ssl/certs/ca-bundle.crt"
+
+ # If we should append our built-in Scalyr cert to the one we find at `ssl_ca_bundle_path`.
+ config :append_builtin_cert, :validate => :boolean, :default => true
+
config :max_request_buffer, :validate => :number, :default => 5500000 # echee TODO: eliminate?
config :force_message_encoding, :validate => :string, :default => nil
config :replace_invalid_utf8, :validate => :boolean, :default => false
# Valid options are bz2, deflate, or none.
@@ -130,18 +125,38 @@
config :status_parser, :validate => :string, :default => "logstash_plugin_metrics"
# Whether or not to create fresh quantile estimators after a status send. Depending on what you want to gather from
# these stas this might be wanted or not.
config :flush_quantile_estimates_on_status_send, :validate => :boolean, :default => false
+
+ # Causes this plugin to act as if it successfully uploaded the logs, while actually returning as quickly as possible
+ # after no work being done.
+ config :noop_mode, :validate => :boolean, :default => false
+ # Manticore related options
+ config :http_connect_timeout, :validate => :number, :default => 10
+ config :http_socket_timeout, :validate => :number, :default => 10
+ config :http_request_timeout, :validate => :number, :default => 60
+ config :http_pool_max, :validate => :number, :default => 50
+ config :http_pool_max_per_route, :validate => :number, :default => 25
+
+ def initialize(*params)
+ super
+ # Request statistics are accumulated across multiple threads and must be accessed through a mutex
+ @stats_lock = Mutex.new
+ @send_stats = Mutex.new
+ end
+
def close
@running = false
@client_session.close if @client_session
end
public
def register
+ # This prng is used exclusively to determine when to sample statistics and no security related purpose, for this
+ # reason we do not ensure thread safety for it.
@prng = Random.new
if @event_metrics_sample_rate < 0 or @event_metrics_sample_rate > 1
raise LogStash::ConfigurationError, "Minimum possible value for 'event_metrics_sample_rate' is 0 (dont sample any events) and maximum is 1 (sample every event)"
end
@@ -221,13 +236,13 @@
# create a client session for uploading to Scalyr
@running = true
@client_session = Scalyr::Common::Client::ClientSession.new(
@logger, @add_events_uri,
- @compression_type, @compression_level,
- @ssl_verify_peer, @ssl_ca_bundle_path, @ssl_verify_depth,
- @append_builtin_cert, @record_stats_for_status, @flush_quantile_estimates_on_status_send
+ @compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path, @append_builtin_cert,
+ @record_stats_for_status, @flush_quantile_estimates_on_status_send,
+ @http_connect_timeout, @http_socket_timeout, @http_request_timeout, @http_pool_max, @http_pool_max_per_route
)
@logger.info("Started Scalyr output plugin", :class => self.class.name)
# Finally, send a status line to Scalyr
@@ -255,10 +270,13 @@
# Also note that event uploads are broken up into batches such that each batch is less than max_request_buffer.
# Increasing max_request_buffer beyond 3MB will lead to failed requests.
#
public
def multi_receive(events)
+ # Just return and pretend we did something if running in noop mode
+ return events if @noop_mode
+
start_time = Time.now.to_f
multi_event_request_array = build_multi_event_request_array(events)
# Loop over all array of multi-event requests, sending each multi-event to Scalyr
@@ -274,10 +292,11 @@
multi_event_request = multi_event_request_array.pop
# For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we
# ignore these.
if !multi_event_request.nil?
@client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration])
+
sleep_interval = 0
result.push(multi_event_request)
end
rescue OpenSSL::SSL::SSLError => e
@@ -304,11 +323,11 @@
exc_data[:code] = e.response_code if e.code
exc_data[:body] = e.response_body if @logger.debug? and e.body
exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug?
if e.is_commonly_retried?
# well-known retriable errors should be debug
- @logger.debug(message, exc_data)
+ @logger.error(message, exc_data)
else
# all other failed uploads should be errors
@logger.error(message, exc_data)
end
retry if @running
@@ -326,13 +345,15 @@
retry if @running
end
end
if records_count > 0
- @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time)
- @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time)
- @plugin_metrics[:multi_receive_event_count].observe(records_count)
+ @stats_lock.synchronize do
+ @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time)
+ @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time)
+ @plugin_metrics[:multi_receive_event_count].observe(records_count)
+ end
end
send_status
return result
end # def multi_receive
@@ -499,14 +520,16 @@
end_time = Time.now.to_f
flatten_nested_values_duration = end_time - start_time
end
if should_sample_event_metrics
- @plugin_metrics[:event_attributes_count].observe(record.count)
+ @stats_lock.synchronize do
+ @plugin_metrics[:event_attributes_count].observe(record.count)
- if @flatten_nested_values
- @plugin_metrics[:flatten_values_duration_secs].observe(flatten_nested_values_duration)
+ if @flatten_nested_values
+ @plugin_metrics[:flatten_values_duration_secs].observe(flatten_nested_values_duration)
+ end
end
end
# Use LogStash event.timestamp as the 'ts' Scalyr timestamp. Note that this may be overwritten by input
# filters so may not necessarily reflect the actual originating timestamp.
@@ -596,11 +619,11 @@
# final upload to Scalyr (from an array of events, and an optional hash of current threads)
# Note: The request body field will be json-encoded.
def create_multi_event_request(scalyr_events, current_threads, current_logs)
body = {
- :session => @session_id,
+ :session => @session_id + Thread.current.object_id.to_s,
:token => @api_write_token,
:events => scalyr_events,
}
add_client_timestamp_to_body body
@@ -636,37 +659,39 @@
end # def create_multi_event_request
# Retrieve batch and other event level metric values
def get_stats
- current_stats = @multi_receive_statistics.clone
+ @stats_lock.synchronize do
+ current_stats = @multi_receive_statistics.clone
- current_stats[:multi_receive_duration_p50] = @plugin_metrics[:multi_receive_duration_secs].query(0.5)
- current_stats[:multi_receive_duration_p90] = @plugin_metrics[:multi_receive_duration_secs].query(0.9)
- current_stats[:multi_receive_duration_p99] = @plugin_metrics[:multi_receive_duration_secs].query(0.99)
+ current_stats[:multi_receive_duration_p50] = @plugin_metrics[:multi_receive_duration_secs].query(0.5)
+ current_stats[:multi_receive_duration_p90] = @plugin_metrics[:multi_receive_duration_secs].query(0.9)
+ current_stats[:multi_receive_duration_p99] = @plugin_metrics[:multi_receive_duration_secs].query(0.99)
- current_stats[:multi_receive_event_count_p50] = @plugin_metrics[:multi_receive_event_count].query(0.5)
- current_stats[:multi_receive_event_count_p90] = @plugin_metrics[:multi_receive_event_count].query(0.9)
- current_stats[:multi_receive_event_count_p99] = @plugin_metrics[:multi_receive_event_count].query(0.99)
+ current_stats[:multi_receive_event_count_p50] = @plugin_metrics[:multi_receive_event_count].query(0.5)
+ current_stats[:multi_receive_event_count_p90] = @plugin_metrics[:multi_receive_event_count].query(0.9)
+ current_stats[:multi_receive_event_count_p99] = @plugin_metrics[:multi_receive_event_count].query(0.99)
- current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5)
- current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9)
- current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99)
+ current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5)
+ current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9)
+ current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99)
- if @flatten_nested_values
- # We only return those metrics in case flattening is enabled
- current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5)
- current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9)
- current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99)
- end
+ if @flatten_nested_values
+ # We only return those metrics in case flattening is enabled
+ current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5)
+ current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9)
+ current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99)
+ end
- if @flush_quantile_estimates_on_status_send
- @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics"
- @plugin_metrics = get_new_metrics
- end
+ if @flush_quantile_estimates_on_status_send
+ @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics"
+ @plugin_metrics = get_new_metrics
+ end
- current_stats
+ current_stats
+ end
end
# Sends a status update to Scalyr by posting a log entry under the special logfile of 'logstash_plugin.log'
# Instead of creating a separate thread, let this method be invoked once at startup and then every 5 minutes
@@ -681,36 +706,37 @@
:attrs => {
'logfile' => "scalyr_logstash.log",
'plugin_id' => self.id,
}
}
-
- if !@last_status_transmit_time
- status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin."
- status_event[:attrs]['serverHost'] = @node_hostname
- else
- cur_time = Time.now()
- return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval
- # echee TODO: get instance stats from session and create a status log line
- msg = 'plugin_status: '
- cnt = 0
- @client_session.get_stats.each do |k, v|
- val = v.instance_of?(Float) ? sprintf("%.4f", v) : v
- val = val.nil? ? 0 : val
- msg << ' ' if cnt > 0
- msg << "#{k.to_s}=#{val}"
- cnt += 1
+ @send_stats.synchronize do
+ if !@last_status_transmit_time
+ status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin."
+ status_event[:attrs]['serverHost'] = @node_hostname
+ else
+ cur_time = Time.now()
+ return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval
+ # echee TODO: get instance stats from session and create a status log line
+ msg = 'plugin_status: '
+ cnt = 0
+ @client_session.get_stats.each do |k, v|
+ val = v.instance_of?(Float) ? sprintf("%.4f", v) : v
+ val = val.nil? ? 0 : val
+ msg << ' ' if cnt > 0
+ msg << "#{k.to_s}=#{val}"
+ cnt += 1
+ end
+ get_stats.each do |k, v|
+ val = v.instance_of?(Float) ? sprintf("%.4f", v) : v
+ val = val.nil? ? 0 : val
+ msg << ' ' if cnt > 0
+ msg << "#{k.to_s}=#{val}"
+ cnt += 1
+ end
+ status_event[:attrs]['message'] = msg
+ status_event[:attrs]['serverHost'] = @node_hostname
+ status_event[:attrs]['parser'] = @status_parser
end
- get_stats.each do |k, v|
- val = v.instance_of?(Float) ? sprintf("%.4f", v) : v
- val = val.nil? ? 0 : val
- msg << ' ' if cnt > 0
- msg << "#{k.to_s}=#{val}"
- cnt += 1
- end
- status_event[:attrs]['message'] = msg
- status_event[:attrs]['serverHost'] = @node_hostname
- status_event[:attrs]['parser'] = @status_parser
end
multi_event_request = create_multi_event_request([status_event], nil, nil)
@client_session.post_add_events(multi_event_request[:body], true, 0)
@last_status_transmit_time = Time.now()