lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.9 vs lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.10.beta

- old
+ new

@@ -6,13 +6,11 @@ require "socket" # for Socket.gethostname require "thread" # for safe queueing require "uri" # for escaping user input require 'json' # for converting event object to JSON for upload -require 'net/http' -require 'net/http/persistent' -require 'net/https' +require 'manticore' require 'rbzip2' require 'zlib' require 'stringio' require 'quantile' @@ -25,26 +23,18 @@ #--------------------------------------------------------------------------------------------------------------------- class LogStash::Outputs::Scalyr < LogStash::Outputs::Base config_name "scalyr" - # For correctness reasons we need to limit this plugin to a single worker, a single worker will be single concurrency - # anyway but we should be explicit. - concurrency :single + concurrency :shared # The Scalyr API write token, these are available at https://www.scalyr.com/keys. This is the only compulsory configuration field required for proper upload config :api_write_token, :validate => :string, :required => true # If you have an EU-based Scalyr account, please use https://eu.scalyr.com/ config :scalyr_server, :validate => :string, :default => "https://agent.scalyr.com/" - # Path to SSL bundle file. - config :ssl_ca_bundle_path, :validate => :string, :default => "/etc/ssl/certs/ca-bundle.crt" - - # If we should append our built-in Scalyr cert to the one we find at `ssl_ca_bundle_path`. - config :append_builtin_cert, :validate => :boolean, :default => true - # server_attributes is a dictionary of key value pairs that represents/identifies the logstash aggregator server # (where this plugin is running). Keys are arbitrary except for the 'serverHost' key which holds special meaning to # Scalyr and is given special treatment in the Scalyr UI. All of these attributes are optional (not required for logs # to be correctly uploaded) config :server_attributes, :validate => :hash, :default => nil @@ -89,14 +79,19 @@ config :retry_initial_interval, :validate => :number, :default => 1 # Set max interval in seconds between bulk retries. config :retry_max_interval, :validate => :number, :default => 64 - # The following two settings pertain to preventing Man-in-the-middle (MITM) attacks # echee TODO: eliminate? + # Whether or not to verify the connection to Scalyr, only set to false for debugging. config :ssl_verify_peer, :validate => :boolean, :default => true - config :ssl_verify_depth, :validate => :number, :default => 5 + # Path to SSL bundle file. + config :ssl_ca_bundle_path, :validate => :string, :default => "/etc/ssl/certs/ca-bundle.crt" + + # If we should append our built-in Scalyr cert to the one we find at `ssl_ca_bundle_path`. + config :append_builtin_cert, :validate => :boolean, :default => true + config :max_request_buffer, :validate => :number, :default => 5500000 # echee TODO: eliminate? config :force_message_encoding, :validate => :string, :default => nil config :replace_invalid_utf8, :validate => :boolean, :default => false # Valid options are bz2, deflate, or none. @@ -130,18 +125,38 @@ config :status_parser, :validate => :string, :default => "logstash_plugin_metrics" # Whether or not to create fresh quantile estimators after a status send. Depending on what you want to gather from # these stas this might be wanted or not. config :flush_quantile_estimates_on_status_send, :validate => :boolean, :default => false + + # Causes this plugin to act as if it successfully uploaded the logs, while actually returning as quickly as possible + # after no work being done. + config :noop_mode, :validate => :boolean, :default => false + # Manticore related options + config :http_connect_timeout, :validate => :number, :default => 10 + config :http_socket_timeout, :validate => :number, :default => 10 + config :http_request_timeout, :validate => :number, :default => 60 + config :http_pool_max, :validate => :number, :default => 50 + config :http_pool_max_per_route, :validate => :number, :default => 25 + + def initialize(*params) + super + # Request statistics are accumulated across multiple threads and must be accessed through a mutex + @stats_lock = Mutex.new + @send_stats = Mutex.new + end + def close @running = false @client_session.close if @client_session end public def register + # This prng is used exclusively to determine when to sample statistics and no security related purpose, for this + # reason we do not ensure thread safety for it. @prng = Random.new if @event_metrics_sample_rate < 0 or @event_metrics_sample_rate > 1 raise LogStash::ConfigurationError, "Minimum possible value for 'event_metrics_sample_rate' is 0 (dont sample any events) and maximum is 1 (sample every event)" end @@ -221,13 +236,13 @@ # create a client session for uploading to Scalyr @running = true @client_session = Scalyr::Common::Client::ClientSession.new( @logger, @add_events_uri, - @compression_type, @compression_level, - @ssl_verify_peer, @ssl_ca_bundle_path, @ssl_verify_depth, - @append_builtin_cert, @record_stats_for_status, @flush_quantile_estimates_on_status_send + @compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path, @append_builtin_cert, + @record_stats_for_status, @flush_quantile_estimates_on_status_send, + @http_connect_timeout, @http_socket_timeout, @http_request_timeout, @http_pool_max, @http_pool_max_per_route ) @logger.info("Started Scalyr output plugin", :class => self.class.name) # Finally, send a status line to Scalyr @@ -255,10 +270,13 @@ # Also note that event uploads are broken up into batches such that each batch is less than max_request_buffer. # Increasing max_request_buffer beyond 3MB will lead to failed requests. # public def multi_receive(events) + # Just return and pretend we did something if running in noop mode + return events if @noop_mode + start_time = Time.now.to_f multi_event_request_array = build_multi_event_request_array(events) # Loop over all array of multi-event requests, sending each multi-event to Scalyr @@ -274,10 +292,11 @@ multi_event_request = multi_event_request_array.pop # For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we # ignore these. if !multi_event_request.nil? @client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration]) + sleep_interval = 0 result.push(multi_event_request) end rescue OpenSSL::SSL::SSLError => e @@ -304,11 +323,11 @@ exc_data[:code] = e.response_code if e.code exc_data[:body] = e.response_body if @logger.debug? and e.body exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug? if e.is_commonly_retried? # well-known retriable errors should be debug - @logger.debug(message, exc_data) + @logger.error(message, exc_data) else # all other failed uploads should be errors @logger.error(message, exc_data) end retry if @running @@ -326,13 +345,15 @@ retry if @running end end if records_count > 0 - @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time) - @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time) - @plugin_metrics[:multi_receive_event_count].observe(records_count) + @stats_lock.synchronize do + @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time) + @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time) + @plugin_metrics[:multi_receive_event_count].observe(records_count) + end end send_status return result end # def multi_receive @@ -499,14 +520,16 @@ end_time = Time.now.to_f flatten_nested_values_duration = end_time - start_time end if should_sample_event_metrics - @plugin_metrics[:event_attributes_count].observe(record.count) + @stats_lock.synchronize do + @plugin_metrics[:event_attributes_count].observe(record.count) - if @flatten_nested_values - @plugin_metrics[:flatten_values_duration_secs].observe(flatten_nested_values_duration) + if @flatten_nested_values + @plugin_metrics[:flatten_values_duration_secs].observe(flatten_nested_values_duration) + end end end # Use LogStash event.timestamp as the 'ts' Scalyr timestamp. Note that this may be overwritten by input # filters so may not necessarily reflect the actual originating timestamp. @@ -596,11 +619,11 @@ # final upload to Scalyr (from an array of events, and an optional hash of current threads) # Note: The request body field will be json-encoded. def create_multi_event_request(scalyr_events, current_threads, current_logs) body = { - :session => @session_id, + :session => @session_id + Thread.current.object_id.to_s, :token => @api_write_token, :events => scalyr_events, } add_client_timestamp_to_body body @@ -636,37 +659,39 @@ end # def create_multi_event_request # Retrieve batch and other event level metric values def get_stats - current_stats = @multi_receive_statistics.clone + @stats_lock.synchronize do + current_stats = @multi_receive_statistics.clone - current_stats[:multi_receive_duration_p50] = @plugin_metrics[:multi_receive_duration_secs].query(0.5) - current_stats[:multi_receive_duration_p90] = @plugin_metrics[:multi_receive_duration_secs].query(0.9) - current_stats[:multi_receive_duration_p99] = @plugin_metrics[:multi_receive_duration_secs].query(0.99) + current_stats[:multi_receive_duration_p50] = @plugin_metrics[:multi_receive_duration_secs].query(0.5) + current_stats[:multi_receive_duration_p90] = @plugin_metrics[:multi_receive_duration_secs].query(0.9) + current_stats[:multi_receive_duration_p99] = @plugin_metrics[:multi_receive_duration_secs].query(0.99) - current_stats[:multi_receive_event_count_p50] = @plugin_metrics[:multi_receive_event_count].query(0.5) - current_stats[:multi_receive_event_count_p90] = @plugin_metrics[:multi_receive_event_count].query(0.9) - current_stats[:multi_receive_event_count_p99] = @plugin_metrics[:multi_receive_event_count].query(0.99) + current_stats[:multi_receive_event_count_p50] = @plugin_metrics[:multi_receive_event_count].query(0.5) + current_stats[:multi_receive_event_count_p90] = @plugin_metrics[:multi_receive_event_count].query(0.9) + current_stats[:multi_receive_event_count_p99] = @plugin_metrics[:multi_receive_event_count].query(0.99) - current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5) - current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9) - current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99) + current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5) + current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9) + current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99) - if @flatten_nested_values - # We only return those metrics in case flattening is enabled - current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5) - current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9) - current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99) - end + if @flatten_nested_values + # We only return those metrics in case flattening is enabled + current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5) + current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9) + current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99) + end - if @flush_quantile_estimates_on_status_send - @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" - @plugin_metrics = get_new_metrics - end + if @flush_quantile_estimates_on_status_send + @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" + @plugin_metrics = get_new_metrics + end - current_stats + current_stats + end end # Sends a status update to Scalyr by posting a log entry under the special logfile of 'logstash_plugin.log' # Instead of creating a separate thread, let this method be invoked once at startup and then every 5 minutes @@ -681,36 +706,37 @@ :attrs => { 'logfile' => "scalyr_logstash.log", 'plugin_id' => self.id, } } - - if !@last_status_transmit_time - status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin." - status_event[:attrs]['serverHost'] = @node_hostname - else - cur_time = Time.now() - return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval - # echee TODO: get instance stats from session and create a status log line - msg = 'plugin_status: ' - cnt = 0 - @client_session.get_stats.each do |k, v| - val = v.instance_of?(Float) ? sprintf("%.4f", v) : v - val = val.nil? ? 0 : val - msg << ' ' if cnt > 0 - msg << "#{k.to_s}=#{val}" - cnt += 1 + @send_stats.synchronize do + if !@last_status_transmit_time + status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin." + status_event[:attrs]['serverHost'] = @node_hostname + else + cur_time = Time.now() + return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval + # echee TODO: get instance stats from session and create a status log line + msg = 'plugin_status: ' + cnt = 0 + @client_session.get_stats.each do |k, v| + val = v.instance_of?(Float) ? sprintf("%.4f", v) : v + val = val.nil? ? 0 : val + msg << ' ' if cnt > 0 + msg << "#{k.to_s}=#{val}" + cnt += 1 + end + get_stats.each do |k, v| + val = v.instance_of?(Float) ? sprintf("%.4f", v) : v + val = val.nil? ? 0 : val + msg << ' ' if cnt > 0 + msg << "#{k.to_s}=#{val}" + cnt += 1 + end + status_event[:attrs]['message'] = msg + status_event[:attrs]['serverHost'] = @node_hostname + status_event[:attrs]['parser'] = @status_parser end - get_stats.each do |k, v| - val = v.instance_of?(Float) ? sprintf("%.4f", v) : v - val = val.nil? ? 0 : val - msg << ' ' if cnt > 0 - msg << "#{k.to_s}=#{val}" - cnt += 1 - end - status_event[:attrs]['message'] = msg - status_event[:attrs]['serverHost'] = @node_hostname - status_event[:attrs]['parser'] = @status_parser end multi_event_request = create_multi_event_request([status_event], nil, nil) @client_session.post_add_events(multi_event_request[:body], true, 0) @last_status_transmit_time = Time.now()