lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.13 vs lib/logstash/outputs/scalyr.rb in logstash-output-scalyr-0.1.14.beta
- old
+ new
@@ -76,10 +76,14 @@
config :flat_tag_prefix, :validate => :string, :default => 'tag_'
config :flat_tag_value, :default => 1
# Initial interval in seconds between bulk retries. Doubled on each retry up to `retry_max_interval`
config :retry_initial_interval, :validate => :number, :default => 1
+ # How many times to retry sending an event before giving up on it
+ config :max_retries, :validate => :number, :default => 5
+ # Whether or not to send messages that failed to send a max_retries amount of times to the DLQ or just drop them
+ config :send_to_dlq, :validate => :boolean, :default => true
# Set max interval in seconds between bulk retries.
config :retry_max_interval, :validate => :number, :default => 64
# Whether or not to verify the connection to Scalyr, only set to false for debugging.
@@ -317,10 +321,11 @@
sleep_interval = sleep_for(sleep_interval)
exc_sleep += sleep_interval
exc_retries += 1
message = "Error uploading to Scalyr (will backoff-retry)"
exc_data = {
+ :error_class => e.e_class,
:url => e.url.to_s,
:message => e.message,
:batch_num => batch_num,
:total_batches => total_batches,
:record_count => multi_event_request[:record_count],
@@ -328,11 +333,11 @@
:will_retry_in_seconds => sleep_interval,
}
exc_data[:code] = e.code if e.code
if @logger.debug? and e.body
exc_data[:body] = e.body
- elsif e.message == "Invalid JSON response from server" and e.body
+ elsif e.body
exc_data[:body] = Scalyr::Common::Util.truncate(e.body, 512)
end
exc_data[:payload] = "\tSample payload: #{request[:body][0,1024]}..." if @logger.debug?
if e.is_commonly_retried?
# well-known retriable errors should be debug
@@ -341,11 +346,13 @@
else
# all other failed uploads should be errors
@logger.error(message, exc_data)
exc_commonly_retried = false
end
- retry if @running
+ retry if @running and exc_retries < @max_retries
+ log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep)
+ next
rescue => e
# Any unexpected errors should be fully logged
@logger.error(
"Unexpected error occurred while uploading to Scalyr (will backoff-retry)",
@@ -361,11 +368,13 @@
:backtrace => e.backtrace,
:multi_event_request => multi_event_request
}
exc_sleep += sleep_interval
exc_retries += 1
- retry if @running
+ retry if @running and exc_retries < @max_retries
+ log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep)
+ next
end
if !exc_data.nil?
message = "Retry successful after error."
if exc_commonly_retried
@@ -398,10 +407,27 @@
)
end
end # def multi_receive
+ def log_retry_failure(multi_event_request, exc_data, exc_retries, exc_sleep)
+ message = "Failed to send #{multi_event_request[:logstash_events].length} events after #{exc_retries} tries."
+ sample_events = Array.new
+ multi_event_request[:logstash_events][0,5].each {|l_event|
+ sample_events << Scalyr::Common::Util.truncate(l_event.to_hash.to_json, 256)
+ }
+ @logger.error(message, :error_data => exc_data, :sample_events => sample_events, :retries => exc_retries, :sleep_time => exc_sleep)
+ if @dlq_writer
+ multi_event_request[:logstash_events].each {|l_event|
+ @dlq_writer.write(l_event, "#{exc_data[:message]}")
+ }
+ else
+ @logger.warn("Deal letter queue not configured, dropping #{multi_event_request[:logstash_events].length} events after #{exc_retries} tries.", :sample_events => sample_events)
+ end
+ end
+
+
# Builds an array of multi-event requests from LogStash events
# Each array element is a request that groups multiple events (to be posted to Scalyr's addEvents endpoint)
#
# This function also performs data transformations to support special fields and, optionally, flatten JSON values.
#
@@ -426,10 +452,12 @@
total_bytes = 0
# Set of unique scalyr threads for this chunk
current_threads = Hash.new
# Create a Scalyr event object for each record in the chunk
scalyr_events = Array.new
+ # Track the logstash events in each chunk to send them to the dlq in case of an error
+ l_events = Array.new
thread_ids = Hash.new
next_id = 1 #incrementing thread id for the session
# per-logfile attributes
@@ -617,33 +645,36 @@
end
if total_bytes + add_bytes > @max_request_buffer
# make sure we always have at least one event
if scalyr_events.size == 0
scalyr_events << scalyr_event
+ l_events << l_event
append_event = false
end
- multi_event_request = self.create_multi_event_request(scalyr_events, current_threads, logs)
+ multi_event_request = self.create_multi_event_request(scalyr_events, l_events, current_threads, logs)
multi_event_request_array << multi_event_request
total_bytes = 0
current_threads = Hash.new
logs = Hash.new
logs_ids = Hash.new
scalyr_events = Array.new
+ l_events = Array.new
end
# if we haven't consumed the current event already
# add it to the end of our array and keep track of the json bytesize
if append_event
scalyr_events << scalyr_event
+ l_events << l_event
total_bytes += add_bytes
end
}
# create a final request with any left over events
- multi_event_request = self.create_multi_event_request(scalyr_events, current_threads, logs)
+ multi_event_request = self.create_multi_event_request(scalyr_events, l_events, current_threads, logs)
multi_event_request_array << multi_event_request
multi_event_request_array
end
@@ -657,11 +688,11 @@
# A request comprises multiple Scalyr Events. This function creates a request hash for
# final upload to Scalyr (from an array of events, and an optional hash of current threads)
# Note: The request body field will be json-encoded.
- def create_multi_event_request(scalyr_events, current_threads, current_logs)
+ def create_multi_event_request(scalyr_events, logstash_events, current_threads, current_logs)
body = {
:session => @session_id + Thread.current.object_id.to_s,
:token => @api_write_token,
:events => scalyr_events,
@@ -693,11 +724,14 @@
# We time serialization to get some insight on how long it takes to serialize the request body
start_time = Time.now.to_f
serialized_body = body.to_json
end_time = Time.now.to_f
serialization_duration = end_time - start_time
- { :body => serialized_body, :record_count => scalyr_events.size, :serialization_duration => serialization_duration }
+ {
+ :body => serialized_body, :record_count => scalyr_events.size, :serialization_duration => serialization_duration,
+ :logstash_events => logstash_events
+ }
end # def create_multi_event_request
# Retrieve batch and other event level metric values
@@ -779,18 +813,27 @@
end
status_event[:attrs]['message'] = msg
status_event[:attrs]['serverHost'] = @node_hostname
status_event[:attrs]['parser'] = @status_parser
end
- multi_event_request = create_multi_event_request([status_event], nil, nil)
+ multi_event_request = create_multi_event_request([status_event], nil, nil, nil)
begin
@client_session.post_add_events(multi_event_request[:body], true, 0)
rescue => e
- @logger.warn(
- "Unexpected error occurred while uploading status to Scalyr",
- :error_message => e.message,
- :error_class => e.class.name
- )
+ if e.body
+ @logger.warn(
+ "Unexpected error occurred while uploading status to Scalyr",
+ :error_message => e.message,
+ :error_class => e.class.name,
+ :body => Scalyr::Common::Util.truncate(e.body, 512)
+ )
+ else
+ @logger.warn(
+ "Unexpected error occurred while uploading status to Scalyr",
+ :error_message => e.message,
+ :error_class => e.class.name
+ )
+ end
return
end
@last_status_transmit_time = Time.now()
end