lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.10.beta vs lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.11.beta
- old
+ new
@@ -14,10 +14,11 @@
require 'stringio'
require 'quantile'
require 'scalyr/common/client'
require "scalyr/common/util"
+require "scalyr/constants"
#---------------------------------------------------------------------------------------------------------------------
# Implements the Scalyr output plugin
#---------------------------------------------------------------------------------------------------------------------
@@ -241,11 +242,11 @@
@compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path, @append_builtin_cert,
@record_stats_for_status, @flush_quantile_estimates_on_status_send,
@http_connect_timeout, @http_socket_timeout, @http_request_timeout, @http_pool_max, @http_pool_max_per_route
)
- @logger.info("Started Scalyr output plugin", :class => self.class.name)
+ @logger.info(sprintf("Started Scalyr output plugin (%s)." % [PLUGIN_VERSION]), :class => self.class.name)
# Finally, send a status line to Scalyr
send_status
end # def register
@@ -254,11 +255,12 @@
def get_new_metrics
return {
:multi_receive_duration_secs => Quantile::Estimator.new,
:multi_receive_event_count => Quantile::Estimator.new,
:event_attributes_count => Quantile::Estimator.new,
- :flatten_values_duration_secs => Quantile::Estimator.new
+ :flatten_values_duration_secs => Quantile::Estimator.new,
+ :batches_per_multi_receive => Quantile::Estimator.new
}
end
# Receive an array of events and immediately upload them (without buffering).
# The Logstash framework will call this plugin method whenever there is a list of events to upload to Scalyr.
@@ -273,91 +275,97 @@
public
def multi_receive(events)
# Just return and pretend we did something if running in noop mode
return events if @noop_mode
- start_time = Time.now.to_f
+ begin
+ start_time = Time.now.to_f
- multi_event_request_array = build_multi_event_request_array(events)
- # Loop over all array of multi-event requests, sending each multi-event to Scalyr
+ multi_event_request_array = build_multi_event_request_array(events)
+ # Loop over all array of multi-event requests, sending each multi-event to Scalyr
- sleep_interval = @retry_initial_interval
- batch_num = 1
- total_batches = multi_event_request_array.length unless multi_event_request_array.nil?
+ sleep_interval = @retry_initial_interval
+ batch_num = 1
+ total_batches = multi_event_request_array.length unless multi_event_request_array.nil?
- result = []
- records_count = events.to_a.length
+ result = []
+ records_count = events.to_a.length
- while !multi_event_request_array.to_a.empty?
- begin
+ while !multi_event_request_array.to_a.empty?
multi_event_request = multi_event_request_array.pop
- # For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we
- # ignore these.
- if !multi_event_request.nil?
- @client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration])
+ begin
+ # For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we
+ # ignore these.
+ if !multi_event_request.nil?
+ @client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration])
- sleep_interval = 0
- result.push(multi_event_request)
- end
+ sleep_interval = @retry_initial_interval
+ batch_num += 1
+ result.push(multi_event_request)
+ end
- rescue OpenSSL::SSL::SSLError => e
- # cannot rely on exception message, so we always log the following warning
- @logger.error "SSL certificate verification failed. " +
- "Please make sure your certificate bundle is configured correctly and points to a valid file. " +
- "You can configure this with the ssl_ca_bundle_path configuration option. " +
- "The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'"
- @logger.error e.message
- @logger.error "Discarding buffer chunk without retrying."
+ rescue Scalyr::Common::Client::ServerError, Scalyr::Common::Client::ClientError => e
+ sleep_interval = sleep_for(sleep_interval)
+ message = "Error uploading to Scalyr (will backoff-retry)"
+ exc_data = {
+ :url => e.url.to_s,
+ :message => e.message,
+ :batch_num => batch_num,
+ :total_batches => total_batches,
+ :record_count => multi_event_request[:record_count],
+ :payload_size => multi_event_request[:body].bytesize,
+ :will_retry_in_seconds => sleep_interval,
+ }
+ exc_data[:code] = e.code if e.code
+ exc_data[:body] = e.body if @logger.debug? and e.body
+ exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug?
+ if e.is_commonly_retried?
+ # well-known retriable errors should be debug
+ @logger.debug(message, exc_data)
+ else
+ # all other failed uploads should be errors
+ @logger.error(message, exc_data)
+ end
+ sleep_interval *= 2
+ retry if @running
- rescue Scalyr::Common::Client::ServerError, Scalyr::Common::Client::ClientError => e
- sleep_interval = sleep_for(sleep_interval)
- message = "Error uploading to Scalyr (will backoff-retry)"
- exc_data = {
- :url => e.url.to_s,
- :message => e.message,
- :batch_num => batch_num,
- :total_batches => total_batches,
- :record_count => multi_event_request[:record_count],
- :payload_size => multi_event_request[:body].bytesize,
- :will_retry_in_seconds => sleep_interval,
- }
- exc_data[:code] = e.response_code if e.code
- exc_data[:body] = e.response_body if @logger.debug? and e.body
- exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug?
- if e.is_commonly_retried?
- # well-known retriable errors should be debug
- @logger.error(message, exc_data)
- else
- # all other failed uploads should be errors
- @logger.error(message, exc_data)
+ rescue => e
+ # Any unexpected errors should be fully logged
+ @logger.error(
+ "Unexpected error occurred while uploading to Scalyr (will backoff-retry)",
+ :error_message => e.message,
+ :error_class => e.class.name,
+ :backtrace => e.backtrace
+ )
+ @logger.debug("Failed multi_event_request", :multi_event_request => multi_event_request)
+ sleep_interval = sleep_for(sleep_interval)
+ sleep_interval *= 2
+ retry if @running
end
- retry if @running
-
- rescue => e
- # Any unexpected errors should be fully logged
- @logger.error(
- "Unexpected error occurred while uploading to Scalyr (will backoff-retry)",
- :error_message => e.message,
- :error_class => e.class.name,
- :backtrace => e.backtrace
- )
- @logger.debug("Failed multi_event_request", :multi_event_request => multi_event_request)
- sleep_interval = sleep_for(sleep_interval)
- retry if @running
end
- end
- if records_count > 0
- @stats_lock.synchronize do
- @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time)
- @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time)
- @plugin_metrics[:multi_receive_event_count].observe(records_count)
+ if records_count > 0
+ @stats_lock.synchronize do
+ @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time)
+ @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time)
+ @plugin_metrics[:multi_receive_event_count].observe(records_count)
+ @plugin_metrics[:batches_per_multi_receive].observe(total_batches)
+ end
end
- end
- send_status
- return result
+ send_status
+ return result
+
+ rescue => e
+ # Any unexpected errors should be fully logged
+ @logger.error(
+ "Unexpected error occurred while executing multi_receive.",
+ :error_message => e.message,
+ :error_class => e.class.name,
+ :backtrace => e.backtrace
+ )
+ end
end # def multi_receive
# Builds an array of multi-event requests from LogStash events
# Each array element is a request that groups multiple events (to be posted to Scalyr's addEvents endpoint)
@@ -674,10 +682,14 @@
current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5)
current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9)
current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99)
+ current_stats[:batches_per_multi_receive_p50] = @plugin_metrics[:batches_per_multi_receive].query(0.5)
+ current_stats[:batches_per_multi_receive_p90] = @plugin_metrics[:batches_per_multi_receive].query(0.9)
+ current_stats[:batches_per_multi_receive_p99] = @plugin_metrics[:batches_per_multi_receive].query(0.99)
+
if @flatten_nested_values
# We only return those metrics in case flattening is enabled
current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5)
current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9)
current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99)
@@ -708,11 +720,11 @@
'plugin_id' => self.id,
}
}
@send_stats.synchronize do
if !@last_status_transmit_time
- status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin."
+ status_event[:attrs]['message'] = sprintf("Started Scalyr LogStash output plugin (%s)." % [PLUGIN_VERSION])
status_event[:attrs]['serverHost'] = @node_hostname
else
cur_time = Time.now()
return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval
# echee TODO: get instance stats from session and create a status log line
@@ -736,10 +748,19 @@
status_event[:attrs]['serverHost'] = @node_hostname
status_event[:attrs]['parser'] = @status_parser
end
end
multi_event_request = create_multi_event_request([status_event], nil, nil)
- @client_session.post_add_events(multi_event_request[:body], true, 0)
+ begin
+ @client_session.post_add_events(multi_event_request[:body], true, 0)
+ rescue => e
+ @logger.warn(
+ "Unexpected error occurred while uploading status to Scalyr",
+ :error_message => e.message,
+ :error_class => e.class.name
+ )
+ return
+ end
@last_status_transmit_time = Time.now()
if @log_status_messages_to_stdout
@logger.info msg
end