lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.10.beta vs lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.11.beta

- old
+ new

@@ -14,10 +14,11 @@ require 'stringio' require 'quantile' require 'scalyr/common/client' require "scalyr/common/util" +require "scalyr/constants" #--------------------------------------------------------------------------------------------------------------------- # Implements the Scalyr output plugin #--------------------------------------------------------------------------------------------------------------------- @@ -241,11 +242,11 @@ @compression_type, @compression_level, @ssl_verify_peer, @ssl_ca_bundle_path, @append_builtin_cert, @record_stats_for_status, @flush_quantile_estimates_on_status_send, @http_connect_timeout, @http_socket_timeout, @http_request_timeout, @http_pool_max, @http_pool_max_per_route ) - @logger.info("Started Scalyr output plugin", :class => self.class.name) + @logger.info(sprintf("Started Scalyr output plugin (%s)." % [PLUGIN_VERSION]), :class => self.class.name) # Finally, send a status line to Scalyr send_status end # def register @@ -254,11 +255,12 @@ def get_new_metrics return { :multi_receive_duration_secs => Quantile::Estimator.new, :multi_receive_event_count => Quantile::Estimator.new, :event_attributes_count => Quantile::Estimator.new, - :flatten_values_duration_secs => Quantile::Estimator.new + :flatten_values_duration_secs => Quantile::Estimator.new, + :batches_per_multi_receive => Quantile::Estimator.new } end # Receive an array of events and immediately upload them (without buffering). # The Logstash framework will call this plugin method whenever there is a list of events to upload to Scalyr. @@ -273,91 +275,97 @@ public def multi_receive(events) # Just return and pretend we did something if running in noop mode return events if @noop_mode - start_time = Time.now.to_f + begin + start_time = Time.now.to_f - multi_event_request_array = build_multi_event_request_array(events) - # Loop over all array of multi-event requests, sending each multi-event to Scalyr + multi_event_request_array = build_multi_event_request_array(events) + # Loop over all array of multi-event requests, sending each multi-event to Scalyr - sleep_interval = @retry_initial_interval - batch_num = 1 - total_batches = multi_event_request_array.length unless multi_event_request_array.nil? + sleep_interval = @retry_initial_interval + batch_num = 1 + total_batches = multi_event_request_array.length unless multi_event_request_array.nil? - result = [] - records_count = events.to_a.length + result = [] + records_count = events.to_a.length - while !multi_event_request_array.to_a.empty? - begin + while !multi_event_request_array.to_a.empty? multi_event_request = multi_event_request_array.pop - # For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we - # ignore these. - if !multi_event_request.nil? - @client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration]) + begin + # For some reason a retry on the multi_receive may result in the request array containing `nil` elements, we + # ignore these. + if !multi_event_request.nil? + @client_session.post_add_events(multi_event_request[:body], false, multi_event_request[:serialization_duration]) - sleep_interval = 0 - result.push(multi_event_request) - end + sleep_interval = @retry_initial_interval + batch_num += 1 + result.push(multi_event_request) + end - rescue OpenSSL::SSL::SSLError => e - # cannot rely on exception message, so we always log the following warning - @logger.error "SSL certificate verification failed. " + - "Please make sure your certificate bundle is configured correctly and points to a valid file. " + - "You can configure this with the ssl_ca_bundle_path configuration option. " + - "The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'" - @logger.error e.message - @logger.error "Discarding buffer chunk without retrying." + rescue Scalyr::Common::Client::ServerError, Scalyr::Common::Client::ClientError => e + sleep_interval = sleep_for(sleep_interval) + message = "Error uploading to Scalyr (will backoff-retry)" + exc_data = { + :url => e.url.to_s, + :message => e.message, + :batch_num => batch_num, + :total_batches => total_batches, + :record_count => multi_event_request[:record_count], + :payload_size => multi_event_request[:body].bytesize, + :will_retry_in_seconds => sleep_interval, + } + exc_data[:code] = e.code if e.code + exc_data[:body] = e.body if @logger.debug? and e.body + exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug? + if e.is_commonly_retried? + # well-known retriable errors should be debug + @logger.debug(message, exc_data) + else + # all other failed uploads should be errors + @logger.error(message, exc_data) + end + sleep_interval *= 2 + retry if @running - rescue Scalyr::Common::Client::ServerError, Scalyr::Common::Client::ClientError => e - sleep_interval = sleep_for(sleep_interval) - message = "Error uploading to Scalyr (will backoff-retry)" - exc_data = { - :url => e.url.to_s, - :message => e.message, - :batch_num => batch_num, - :total_batches => total_batches, - :record_count => multi_event_request[:record_count], - :payload_size => multi_event_request[:body].bytesize, - :will_retry_in_seconds => sleep_interval, - } - exc_data[:code] = e.response_code if e.code - exc_data[:body] = e.response_body if @logger.debug? and e.body - exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug? - if e.is_commonly_retried? - # well-known retriable errors should be debug - @logger.error(message, exc_data) - else - # all other failed uploads should be errors - @logger.error(message, exc_data) + rescue => e + # Any unexpected errors should be fully logged + @logger.error( + "Unexpected error occurred while uploading to Scalyr (will backoff-retry)", + :error_message => e.message, + :error_class => e.class.name, + :backtrace => e.backtrace + ) + @logger.debug("Failed multi_event_request", :multi_event_request => multi_event_request) + sleep_interval = sleep_for(sleep_interval) + sleep_interval *= 2 + retry if @running end - retry if @running - - rescue => e - # Any unexpected errors should be fully logged - @logger.error( - "Unexpected error occurred while uploading to Scalyr (will backoff-retry)", - :error_message => e.message, - :error_class => e.class.name, - :backtrace => e.backtrace - ) - @logger.debug("Failed multi_event_request", :multi_event_request => multi_event_request) - sleep_interval = sleep_for(sleep_interval) - retry if @running end - end - if records_count > 0 - @stats_lock.synchronize do - @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time) - @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time) - @plugin_metrics[:multi_receive_event_count].observe(records_count) + if records_count > 0 + @stats_lock.synchronize do + @multi_receive_statistics[:total_multi_receive_secs] += (Time.now.to_f - start_time) + @plugin_metrics[:multi_receive_duration_secs].observe(Time.now.to_f - start_time) + @plugin_metrics[:multi_receive_event_count].observe(records_count) + @plugin_metrics[:batches_per_multi_receive].observe(total_batches) + end end - end - send_status - return result + send_status + return result + + rescue => e + # Any unexpected errors should be fully logged + @logger.error( + "Unexpected error occurred while executing multi_receive.", + :error_message => e.message, + :error_class => e.class.name, + :backtrace => e.backtrace + ) + end end # def multi_receive # Builds an array of multi-event requests from LogStash events # Each array element is a request that groups multiple events (to be posted to Scalyr's addEvents endpoint) @@ -674,10 +682,14 @@ current_stats[:event_attributes_count_p50] = @plugin_metrics[:event_attributes_count].query(0.5) current_stats[:event_attributes_count_p90] = @plugin_metrics[:event_attributes_count].query(0.9) current_stats[:event_attributes_count_p99] = @plugin_metrics[:event_attributes_count].query(0.99) + current_stats[:batches_per_multi_receive_p50] = @plugin_metrics[:batches_per_multi_receive].query(0.5) + current_stats[:batches_per_multi_receive_p90] = @plugin_metrics[:batches_per_multi_receive].query(0.9) + current_stats[:batches_per_multi_receive_p99] = @plugin_metrics[:batches_per_multi_receive].query(0.99) + if @flatten_nested_values # We only return those metrics in case flattening is enabled current_stats[:flatten_values_duration_secs_p50] = @plugin_metrics[:flatten_values_duration_secs].query(0.5) current_stats[:flatten_values_duration_secs_p90] = @plugin_metrics[:flatten_values_duration_secs].query(0.9) current_stats[:flatten_values_duration_secs_p99] = @plugin_metrics[:flatten_values_duration_secs].query(0.99) @@ -708,11 +720,11 @@ 'plugin_id' => self.id, } } @send_stats.synchronize do if !@last_status_transmit_time - status_event[:attrs]['message'] = "Started Scalyr LogStash output plugin." + status_event[:attrs]['message'] = sprintf("Started Scalyr LogStash output plugin (%s)." % [PLUGIN_VERSION]) status_event[:attrs]['serverHost'] = @node_hostname else cur_time = Time.now() return if (cur_time.to_i - @last_status_transmit_time.to_i) < @status_report_interval # echee TODO: get instance stats from session and create a status log line @@ -736,10 +748,19 @@ status_event[:attrs]['serverHost'] = @node_hostname status_event[:attrs]['parser'] = @status_parser end end multi_event_request = create_multi_event_request([status_event], nil, nil) - @client_session.post_add_events(multi_event_request[:body], true, 0) + begin + @client_session.post_add_events(multi_event_request[:body], true, 0) + rescue => e + @logger.warn( + "Unexpected error occurred while uploading status to Scalyr", + :error_message => e.message, + :error_class => e.class.name + ) + return + end @last_status_transmit_time = Time.now() if @log_status_messages_to_stdout @logger.info msg end