lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.6 vs lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.7
- old
+ new
@@ -50,19 +50,22 @@
# create a new connection for every post or use a persistent connection)
#---------------------------------------------------------------------------------------------------------------------
class ClientSession
def initialize(logger, add_events_uri, compression_type, compression_level,
- ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert)
+ ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert,
+ record_stats_for_status, flush_quantile_estimates_on_status_send)
@logger = logger
@add_events_uri = add_events_uri # typically /addEvents
@compression_type = compression_type
@compression_level = compression_level
@ssl_verify_peer = ssl_verify_peer
@ssl_ca_bundle_path = ssl_ca_bundle_path
@append_builtin_cert = append_builtin_cert
@ssl_verify_depth = ssl_verify_depth
+ @record_stats_for_status = record_stats_for_status
+ @flush_quantile_estimates_on_status_send = flush_quantile_estimates_on_status_send
# A cert to use by default to avoid issues caused by the OpenSSL library not validating certs according to standard
@cert_string = "" \
"-----BEGIN CERTIFICATE-----\n" \
"MIIG6zCCBNOgAwIBAgIJAM5aknNWtN6oMA0GCSqGSIb3DQEBCwUAMIGpMQswCQYD\n" \
@@ -104,20 +107,30 @@
"H4Z/mHoGi5SRnye+Wo+jyiQiWjJQ5LrlQPbHmuO0tLs9lM1t9nhzLifzga5F4+o=\n" \
"-----END CERTIFICATE-----"
# Request statistics are accumulated across multiple threads and must be accessed through a mutex
@stats_lock = Mutex.new
+ @latency_stats = get_new_latency_stats
@stats = {
:total_requests_sent => 0, # The total number of RPC requests sent.
:total_requests_failed => 0, # The total number of RPC requests that failed.
:total_request_bytes_sent => 0, # The total number of bytes sent over the network.
:total_compressed_request_bytes_sent => 0, # The total number of compressed bytes sent over the network
:total_response_bytes_received => 0, # The total number of bytes received.
:total_request_latency_secs => 0, # The total number of secs spent waiting for a responses (so average latency
# can be calculated by dividing this number by @total_requests_sent).
# This includes connection establishment time.
:total_connections_created => 0, # The total number of HTTP connections successfully created.
+ :total_serialization_duration_secs => 0, # The total duration (in seconds) it took to serialize (JSON dumos) all the request bodies.
+ # You can calculate avg compression duration by diving this value with total_requests_sent
+ :total_compression_duration_secs => 0, # The total duration (in seconds) it took to compress all the request bodies.
+ # You can calculate avg compression duration by diving this value with total_requests_sent
+ :total_flatten_values_duration_secs => 0, # The total duration (in seconds) it took to flatten nested record values.
+ # In case flattening is disabled, this value will always be 0. Can infer average per-request value by dividing this
+ # value by total_requests_sent
+ :compression_type => @compression_type,
+ :compression_level => @compression_level,
}
@http = Net::HTTP::Persistent.new
# verify peers to prevent potential MITM attacks
@@ -139,22 +152,54 @@
else
@http.verify_mode = OpenSSL::SSL::VERIFY_NONE
end
end # def initialize
+ # Convenience method to create a fresh quantile estimator
+ def get_new_latency_stats
+ return {
+ # The total number of HTTP connections successfully created.
+ :serialization_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to serialize (JSON dumos) all the request bodies.
+ :compression_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to compress all the request bodies.
+ :flatten_values_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to flatten nested record values.
+ # In case flattening is disabled, this value will always be 0.
+ :request_latency_secs => Quantile::Estimator.new, # Secs spent waiting for a responses. This includes connection establishment time.
+ :bytes_sent => Quantile::Estimator.new # The number of bytes sent over the network. Batch size with a bit more overhead.
+ }
+ end
-
# Get a clone of current statistics hash
def get_stats
- @stats.clone
+ current_stats = @stats.clone
+
+ current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5)
+ current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9)
+ current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99)
+ current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5)
+ current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9)
+ current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99)
+ current_stats[:flatten_values_duration_secs_p50] = @latency_stats[:flatten_values_duration_secs].query(0.5)
+ current_stats[:flatten_values_duration_secs_p90] = @latency_stats[:flatten_values_duration_secs].query(0.9)
+ current_stats[:flatten_values_duration_secs_p99] = @latency_stats[:flatten_values_duration_secs].query(0.99)
+ current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5)
+ current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9)
+ current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99)
+ current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5)
+ current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9)
+ current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99)
+
+ if @flush_quantile_estimates_on_status_send
+ @latency_stats = get_new_latency_stats
+ end
+ current_stats
end
# Upload data to Scalyr. Assumes that the body size complies with Scalyr limits
- def post_add_events(body)
- post = prepare_post_object @add_events_uri.path, body
+ def post_add_events(body, is_status, body_serialization_duration = 0, flatten_nested_values_duration = 0)
+ post, compression_duration = prepare_post_object @add_events_uri.path, body
fail_count = 1 # putative assume failure
start_time = Time.now
uncompressed_bytes_sent = 0
compressed_bytes_sent = 0
bytes_received = 0
@@ -183,20 +228,29 @@
# The underlying persistent-connection library automatically retries when there are network-related errors.
# Eventually, it will give up and raise this generic error, at which time, we convert it to a ClientError
raise ClientError.new(e.message, @add_events_uri)
ensure
- @stats_lock.synchronize do
- @stats[:total_requests_sent] += 1
- @stats[:total_requests_failed] += fail_count
- @stats[:total_request_bytes_sent] += uncompressed_bytes_sent
- @stats[:total_compressed_request_bytes_sent] += compressed_bytes_sent
- @stats[:total_response_bytes_received] += bytes_received
- end_time = Time.now
- @stats[:total_request_latency_secs] += (end_time - start_time)
+ if @record_stats_for_status or !is_status
+ @stats_lock.synchronize do
+ @stats[:total_requests_sent] += 1
+ @stats[:total_requests_failed] += fail_count
+ @stats[:total_request_bytes_sent] += uncompressed_bytes_sent
+ @stats[:total_compressed_request_bytes_sent] += compressed_bytes_sent
+ @stats[:total_response_bytes_received] += bytes_received
+ @stats[:total_serialization_duration_secs] += body_serialization_duration
+ @stats[:total_flatten_values_duration_secs] += flatten_nested_values_duration
+ @stats[:total_compression_duration_secs] += compression_duration
+ end_time = Time.now
+ @stats[:total_request_latency_secs] += (end_time - start_time)
+ @latency_stats[:request_latency_secs].observe(end_time - start_time)
+ @latency_stats[:serialization_duration_secs].observe(body_serialization_duration)
+ @latency_stats[:flatten_values_duration_secs].observe(flatten_nested_values_duration)
+ @latency_stats[:compression_duration_secs].observe(compression_duration)
+ @latency_stats[:bytes_sent].observe(uncompressed_bytes_sent)
+ end
end
-
end
end # def post_request
@@ -209,11 +263,13 @@
# Prepare a post object to be sent, compressing it if necessary
private
def prepare_post_object(uri_path, body)
# use compression if enabled
encoding = nil
+ compression_duration = 0
if @compression_type
+ start_time = Time.now.to_f
if @compression_type == 'deflate'
encoding = 'deflate'
compressed_body = Zlib::Deflate.deflate(body, @compression_level)
elsif @compression_type == 'bz2'
encoding = 'bz2'
@@ -221,23 +277,25 @@
bz2 = RBzip2.default_adapter::Compressor.new io
bz2.write body
bz2.close
compressed_body = io.string
end
+ end_time = Time.now.to_f
+ compression_duration = end_time - start_time
end
post = Net::HTTP::Post.new uri_path
post.add_field('Content-Type', 'application/json')
- version = 'output-logstash-scalyr 0.1.6'
+ version = 'output-logstash-scalyr 0.1.7'
post.add_field('User-Agent', version + ';' + RUBY_VERSION + ';' + RUBY_PLATFORM)
if not encoding.nil?
post.add_field('Content-Encoding', encoding)
post.body = compressed_body
else
post.body = body
end
- post
+ return post, compression_duration
end # def prepare_post_object
# Process responses from Scalyr, raising appropriate exceptions if needed