lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.6 vs lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.7

- old
+ new

@@ -50,19 +50,22 @@ # create a new connection for every post or use a persistent connection) #--------------------------------------------------------------------------------------------------------------------- class ClientSession def initialize(logger, add_events_uri, compression_type, compression_level, - ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert) + ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert, + record_stats_for_status, flush_quantile_estimates_on_status_send) @logger = logger @add_events_uri = add_events_uri # typically /addEvents @compression_type = compression_type @compression_level = compression_level @ssl_verify_peer = ssl_verify_peer @ssl_ca_bundle_path = ssl_ca_bundle_path @append_builtin_cert = append_builtin_cert @ssl_verify_depth = ssl_verify_depth + @record_stats_for_status = record_stats_for_status + @flush_quantile_estimates_on_status_send = flush_quantile_estimates_on_status_send # A cert to use by default to avoid issues caused by the OpenSSL library not validating certs according to standard @cert_string = "" \ "-----BEGIN CERTIFICATE-----\n" \ "MIIG6zCCBNOgAwIBAgIJAM5aknNWtN6oMA0GCSqGSIb3DQEBCwUAMIGpMQswCQYD\n" \ @@ -104,20 +107,30 @@ "H4Z/mHoGi5SRnye+Wo+jyiQiWjJQ5LrlQPbHmuO0tLs9lM1t9nhzLifzga5F4+o=\n" \ "-----END CERTIFICATE-----" # Request statistics are accumulated across multiple threads and must be accessed through a mutex @stats_lock = Mutex.new + @latency_stats = get_new_latency_stats @stats = { :total_requests_sent => 0, # The total number of RPC requests sent. :total_requests_failed => 0, # The total number of RPC requests that failed. :total_request_bytes_sent => 0, # The total number of bytes sent over the network. :total_compressed_request_bytes_sent => 0, # The total number of compressed bytes sent over the network :total_response_bytes_received => 0, # The total number of bytes received. :total_request_latency_secs => 0, # The total number of secs spent waiting for a responses (so average latency # can be calculated by dividing this number by @total_requests_sent). # This includes connection establishment time. :total_connections_created => 0, # The total number of HTTP connections successfully created. + :total_serialization_duration_secs => 0, # The total duration (in seconds) it took to serialize (JSON dumos) all the request bodies. + # You can calculate avg compression duration by diving this value with total_requests_sent + :total_compression_duration_secs => 0, # The total duration (in seconds) it took to compress all the request bodies. + # You can calculate avg compression duration by diving this value with total_requests_sent + :total_flatten_values_duration_secs => 0, # The total duration (in seconds) it took to flatten nested record values. + # In case flattening is disabled, this value will always be 0. Can infer average per-request value by dividing this + # value by total_requests_sent + :compression_type => @compression_type, + :compression_level => @compression_level, } @http = Net::HTTP::Persistent.new # verify peers to prevent potential MITM attacks @@ -139,22 +152,54 @@ else @http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end # def initialize + # Convenience method to create a fresh quantile estimator + def get_new_latency_stats + return { + # The total number of HTTP connections successfully created. + :serialization_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to serialize (JSON dumos) all the request bodies. + :compression_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to compress all the request bodies. + :flatten_values_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to flatten nested record values. + # In case flattening is disabled, this value will always be 0. + :request_latency_secs => Quantile::Estimator.new, # Secs spent waiting for a responses. This includes connection establishment time. + :bytes_sent => Quantile::Estimator.new # The number of bytes sent over the network. Batch size with a bit more overhead. + } + end - # Get a clone of current statistics hash def get_stats - @stats.clone + current_stats = @stats.clone + + current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5) + current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9) + current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99) + current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5) + current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9) + current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99) + current_stats[:flatten_values_duration_secs_p50] = @latency_stats[:flatten_values_duration_secs].query(0.5) + current_stats[:flatten_values_duration_secs_p90] = @latency_stats[:flatten_values_duration_secs].query(0.9) + current_stats[:flatten_values_duration_secs_p99] = @latency_stats[:flatten_values_duration_secs].query(0.99) + current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5) + current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9) + current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99) + current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5) + current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9) + current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99) + + if @flush_quantile_estimates_on_status_send + @latency_stats = get_new_latency_stats + end + current_stats end # Upload data to Scalyr. Assumes that the body size complies with Scalyr limits - def post_add_events(body) - post = prepare_post_object @add_events_uri.path, body + def post_add_events(body, is_status, body_serialization_duration = 0, flatten_nested_values_duration = 0) + post, compression_duration = prepare_post_object @add_events_uri.path, body fail_count = 1 # putative assume failure start_time = Time.now uncompressed_bytes_sent = 0 compressed_bytes_sent = 0 bytes_received = 0 @@ -183,20 +228,29 @@ # The underlying persistent-connection library automatically retries when there are network-related errors. # Eventually, it will give up and raise this generic error, at which time, we convert it to a ClientError raise ClientError.new(e.message, @add_events_uri) ensure - @stats_lock.synchronize do - @stats[:total_requests_sent] += 1 - @stats[:total_requests_failed] += fail_count - @stats[:total_request_bytes_sent] += uncompressed_bytes_sent - @stats[:total_compressed_request_bytes_sent] += compressed_bytes_sent - @stats[:total_response_bytes_received] += bytes_received - end_time = Time.now - @stats[:total_request_latency_secs] += (end_time - start_time) + if @record_stats_for_status or !is_status + @stats_lock.synchronize do + @stats[:total_requests_sent] += 1 + @stats[:total_requests_failed] += fail_count + @stats[:total_request_bytes_sent] += uncompressed_bytes_sent + @stats[:total_compressed_request_bytes_sent] += compressed_bytes_sent + @stats[:total_response_bytes_received] += bytes_received + @stats[:total_serialization_duration_secs] += body_serialization_duration + @stats[:total_flatten_values_duration_secs] += flatten_nested_values_duration + @stats[:total_compression_duration_secs] += compression_duration + end_time = Time.now + @stats[:total_request_latency_secs] += (end_time - start_time) + @latency_stats[:request_latency_secs].observe(end_time - start_time) + @latency_stats[:serialization_duration_secs].observe(body_serialization_duration) + @latency_stats[:flatten_values_duration_secs].observe(flatten_nested_values_duration) + @latency_stats[:compression_duration_secs].observe(compression_duration) + @latency_stats[:bytes_sent].observe(uncompressed_bytes_sent) + end end - end end # def post_request @@ -209,11 +263,13 @@ # Prepare a post object to be sent, compressing it if necessary private def prepare_post_object(uri_path, body) # use compression if enabled encoding = nil + compression_duration = 0 if @compression_type + start_time = Time.now.to_f if @compression_type == 'deflate' encoding = 'deflate' compressed_body = Zlib::Deflate.deflate(body, @compression_level) elsif @compression_type == 'bz2' encoding = 'bz2' @@ -221,23 +277,25 @@ bz2 = RBzip2.default_adapter::Compressor.new io bz2.write body bz2.close compressed_body = io.string end + end_time = Time.now.to_f + compression_duration = end_time - start_time end post = Net::HTTP::Post.new uri_path post.add_field('Content-Type', 'application/json') - version = 'output-logstash-scalyr 0.1.6' + version = 'output-logstash-scalyr 0.1.7' post.add_field('User-Agent', version + ';' + RUBY_VERSION + ';' + RUBY_PLATFORM) if not encoding.nil? post.add_field('Content-Encoding', encoding) post.body = compressed_body else post.body = body end - post + return post, compression_duration end # def prepare_post_object # Process responses from Scalyr, raising appropriate exceptions if needed