lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.13 vs lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.14.beta

- old
+ new

@@ -76,10 +76,14 @@ config :flat_tag_prefix, :validate => :string, :default => 'tag_' config :flat_tag_value, :default => 1 # Initial interval in seconds between bulk retries. Doubled on each retry up to `retry_max_interval` config :retry_initial_interval, :validate => :number, :default => 1 + # How many times to retry sending an event before giving up on it + config :max_retries, :validate => :number, :default => 5 + # Whether or not to send messages that failed to send a max_retries amount of times to the DLQ or just drop them + config :send_to_dlq, :validate => :boolean, :default => true # Set max interval in seconds between bulk retries. config :retry_max_interval, :validate => :number, :default => 64 # Whether or not to verify the connection to Scalyr, only set to false for debugging. @@ -317,10 +321,11 @@ sleep_interval = sleep_for(sleep_interval) exc_sleep += sleep_interval exc_retries += 1 message = "Error uploading to Scalyr (will backoff-retry)" exc_data = { + :error_class => e.e_class, :url => e.url.to_s, :message => e.message, :batch_num => batch_num, :total_batches => total_batches, :record_count => multi_event_request[:record_count], @@ -328,11 +333,11 @@ :will_retry_in_seconds => sleep_interval, } exc_data[:code] = e.code if e.code if @logger.debug? and e.body exc_data[:body] = e.body - elsif e.message == "Invalid JSON response from server" and e.body + elsif e.body exc_data[:body] = Scalyr::Common::Util.truncate(e.body, 512) end exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug? if e.is_commonly_retried? # well-known retriable errors should be debug @@ -341,11 +346,13 @@ else # all other failed uploads should be errors @logger.error(message, exc_data) exc_commonly_retried = false end - retry if @running + retry if @running and exc_retries < @max_retries + log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep) + next rescue => e # Any unexpected errors should be fully logged @logger.error( "Unexpected error occurred while uploading to Scalyr (will backoff-retry)", @@ -361,11 +368,13 @@ :backtrace => e.backtrace, :multi_event_request => multi_event_request } exc_sleep += sleep_interval exc_retries += 1 - retry if @running + retry if @running and exc_retries < @max_retries + log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep) + next end if !exc_data.nil? message = "Retry successful after error." if exc_commonly_retried @@ -398,10 +407,27 @@ ) end end # def multi_receive + def log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep) + message = "Failed to send #{multi_event_request[:logstash_events].length} events after #{exc_retries} tries." + sample_events = Array.new + multi_event_request[:logstash_events][0,5].each {|l_event| + sample_events << Scalyr::Common::Util.truncate(l_event.to_hash.to_json, 256) + } + @logger.error(message, :error_data => exc_data, :sample_events => sample_events, :retries => exc_retries, :sleep_time => exc_sleep) + if @dlq_writer + multi_event_request[:logstash_events].each {|l_event| + @dlq_writer.write(l_event, "#{exc_data[:message]}") + } + else + @logger.warn("Deal letter queue not configured, dropping #{multi_event_request[:logstash_events].length} events after #{exc_retries} tries.", :sample_events => sample_events) + end + end + + # Builds an array of multi-event requests from LogStash events # Each array element is a request that groups multiple events (to be posted to Scalyr's addEvents endpoint) # # This function also performs data transformations to support special fields and, optionally, flatten JSON values. # @@ -426,10 +452,12 @@ total_bytes = 0 # Set of unique scalyr threads for this chunk current_threads = Hash.new # Create a Scalyr event object for each record in the chunk scalyr_events = Array.new + # Track the logstash events in each chunk to send them to the dlq in case of an error + l_events = Array.new thread_ids = Hash.new next_id = 1 #incrementing thread id for the session # per-logfile attributes @@ -617,33 +645,36 @@ end if total_bytes + add_bytes > @max_request_buffer # make sure we always have at least one event if scalyr_events.size == 0 scalyr_events << scalyr_event + l_events << l_event append_event = false end - multi_event_request = self.create_multi_event_request(scalyr_events, current_threads, logs) + multi_event_request = self.create_multi_event_request(scalyr_events, l_events, current_threads, logs) multi_event_request_array << multi_event_request total_bytes = 0 current_threads = Hash.new logs = Hash.new logs_ids = Hash.new scalyr_events = Array.new + l_events = Array.new end # if we haven't consumed the current event already # add it to the end of our array and keep track of the json bytesize if append_event scalyr_events << scalyr_event + l_events << l_event total_bytes += add_bytes end } # create a final request with any left over events - multi_event_request = self.create_multi_event_request(scalyr_events, current_threads, logs) + multi_event_request = self.create_multi_event_request(scalyr_events, l_events, current_threads, logs) multi_event_request_array << multi_event_request multi_event_request_array end @@ -657,11 +688,11 @@ # A request comprises multiple Scalyr Events. This function creates a request hash for # final upload to Scalyr (from an array of events, and an optional hash of current threads) # Note: The request body field will be json-encoded. - def create_multi_event_request(scalyr_events, current_threads, current_logs) + def create_multi_event_request(scalyr_events, logstash_events, current_threads, current_logs) body = { :session => @session_id + Thread.current.object_id.to_s, :token => @api_write_token, :events => scalyr_events, @@ -693,11 +724,14 @@ # We time serialization to get some insight on how long it takes to serialize the request body start_time = Time.now.to_f serialized_body = body.to_json end_time = Time.now.to_f serialization_duration = end_time - start_time - { :body => serialized_body, :record_count => scalyr_events.size, :serialization_duration => serialization_duration } + { + :body => serialized_body, :record_count => scalyr_events.size, :serialization_duration => serialization_duration, + :logstash_events => logstash_events + } end # def create_multi_event_request # Retrieve batch and other event level metric values @@ -779,18 +813,27 @@ end status_event[:attrs]['message'] = msg status_event[:attrs]['serverHost'] = @node_hostname status_event[:attrs]['parser'] = @status_parser end - multi_event_request = create_multi_event_request([status_event], nil, nil) + multi_event_request = create_multi_event_request([status_event], nil, nil, nil) begin @client_session.post_add_events(multi_event_request[:body], true, 0) rescue => e - @logger.warn( - "Unexpected error occurred while uploading status to Scalyr", - :error_message => e.message, - :error_class => e.class.name - ) + if e.body + @logger.warn( + "Unexpected error occurred while uploading status to Scalyr", + :error_message => e.message, + :error_class => e.class.name, + :body => Scalyr::Common::Util.truncate(e.body, 512) + ) + else + @logger.warn( + "Unexpected error occurred while uploading status to Scalyr", + :error_message => e.message, + :error_class => e.class.name + ) + end return end @last_status_transmit_time = Time.now() end