lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.9 vs lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.10.beta

- old
+ new

@@ -50,22 +50,27 @@ # create a new connection for every post or use a persistent connection) #--------------------------------------------------------------------------------------------------------------------- class ClientSession def initialize(logger, add_events_uri, compression_type, compression_level, - ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert, - record_stats_for_status, flush_quantile_estimates_on_status_send) + ssl_verify_peer, ssl_ca_bundle_path, append_builtin_cert, + record_stats_for_status, flush_quantile_estimates_on_status_send, + connect_timeout, socket_timeout, request_timeout, pool_max, pool_max_per_route) @logger = logger @add_events_uri = add_events_uri # typically /addEvents @compression_type = compression_type @compression_level = compression_level @ssl_verify_peer = ssl_verify_peer @ssl_ca_bundle_path = ssl_ca_bundle_path @append_builtin_cert = append_builtin_cert - @ssl_verify_depth = ssl_verify_depth @record_stats_for_status = record_stats_for_status @flush_quantile_estimates_on_status_send = flush_quantile_estimates_on_status_send + @connect_timeout = connect_timeout + @socket_timeout = socket_timeout + @request_timeout = request_timeout + @pool_max = pool_max + @pool_max_per_route = pool_max_per_route # A cert to use by default to avoid issues caused by the OpenSSL library not validating certs according to standard @cert_string = "" \ "-----BEGIN CERTIFICATE-----\n" \ "MIIG6zCCBNOgAwIBAgIJAM5aknNWtN6oMA0GCSqGSIb3DQEBCwUAMIGpMQswCQYD\n" \ @@ -124,15 +129,33 @@ :total_compression_duration_secs => 0, # The total duration (in seconds) it took to compress all the request bodies. # You can calculate avg compression duration by diving this value with total_requests_sent :compression_type => @compression_type, :compression_level => @compression_level, } + end # def initialize - @http = Net::HTTP::Persistent.new + def client_config + # TODO: Eventually expose some more of these as config options, though nothing here really needs tuning normally + # besides SSL + c = { + connect_timeout: @connect_timeout, + socket_timeout: @socket_timeout, + request_timeout: @request_timeout, + follow_redirects: true, + automatic_retries: 1, + retry_non_idempotent: false, + check_connection_timeout: 200, + pool_max: @pool_max, + pool_max_per_route: @pool_max_per_route, + cookies: true, + keepalive: true, + ssl: {} + } # verify peers to prevent potential MITM attacks if @ssl_verify_peer + c[:ssl][:verify] = :strict @ca_cert = Tempfile.new("ca_cert") if File.file?(@ssl_ca_bundle_path) @ca_cert.write(File.read(@ssl_ca_bundle_path)) @ca_cert.flush end @@ -140,18 +163,22 @@ open(@ca_cert.path, 'a') do |f| f.puts @cert_string end end @ca_cert.flush - @http.ca_file = @ca_cert.path - @http.verify_mode = OpenSSL::SSL::VERIFY_PEER - @http.verify_depth = @ssl_verify_depth + c[:ssl][:ca_file] = @ca_cert.path else - @http.verify_mode = OpenSSL::SSL::VERIFY_NONE + c[:ssl][:verify] = :disable end - end # def initialize + c + end + + def client + @client ||= Manticore::Client.new(client_config) + end + # Convenience method to create a fresh quantile estimator def get_new_latency_stats return { # The total number of HTTP connections successfully created. :serialization_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to serialize (JSON dumos) all the request bodies. @@ -161,64 +188,60 @@ } end # Get a clone of current statistics hash and calculate percentiles def get_stats - current_stats = @stats.clone + @stats_lock.synchronize do + current_stats = @stats.clone - current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5) - current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9) - current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99) - current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5) - current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9) - current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99) - current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5) - current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9) - current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99) - current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5) - current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9) - current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99) + current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5) + current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9) + current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99) + current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5) + current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9) + current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99) + current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5) + current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9) + current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99) + current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5) + current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9) + current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99) - if @flush_quantile_estimates_on_status_send - @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" - @latency_stats = get_new_latency_stats + if @flush_quantile_estimates_on_status_send + @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" + @latency_stats = get_new_latency_stats + end + current_stats end - current_stats end # Upload data to Scalyr. Assumes that the body size complies with Scalyr limits def post_add_events(body, is_status, body_serialization_duration = 0) - post, compression_duration = prepare_post_object @add_events_uri.path, body + post_body, post_headers, compression_duration = prepare_post_object @add_events_uri.path, body fail_count = 1 # putative assume failure start_time = Time.now uncompressed_bytes_sent = 0 compressed_bytes_sent = 0 bytes_received = 0 begin - response = @http.request(@add_events_uri, post) + response = client.send(:post, @add_events_uri, body: post_body, headers: post_headers) handle_response(response) fail_count -= 1 # success means we negate the putative failure uncompressed_bytes_sent = (body.bytesize + @add_events_uri.path.bytesize) - compressed_bytes_sent = (post.body.bytesize + @add_events_uri.path.bytesize) + compressed_bytes_sent = (post_body.bytesize + @add_events_uri.path.bytesize) bytes_received = response.body.bytesize # echee: double check # echee TODO add more statistics + # TODO: Manticore doesn't raise SSL errors as this but as "UnknownExceptions", need to dig in and see if there is a + # way to detect that it is from SSL. rescue OpenSSL::SSL::SSLError => e - if @ssl_verify_peer and @ssl_ca_bundle_path.nil? and !File.file?(@ca_cert.path) - @ca_cert = Tempfile.new("ca_cert") - @ca_cert.write(@cert_string) - @ca_cert.flush - @http.ca_file = @ca_cert.path - raise ClientError.new("Packaged certificate appears to have been deleted, writing a new one.", @add_events_uri) - else - raise e - end + raise e - rescue Net::HTTP::Persistent::Error => e + rescue Manticore::ManticoreException => e # The underlying persistent-connection library automatically retries when there are network-related errors. # Eventually, it will give up and raise this generic error, at which time, we convert it to a ClientError raise ClientError.new(e.message, @add_events_uri) ensure @@ -243,11 +266,10 @@ end # def post_request def close - @http.shutdown end # def close # Prepare a post object to be sent, compressing it if necessary @@ -271,21 +293,23 @@ end end_time = Time.now.to_f compression_duration = end_time - start_time end - post = Net::HTTP::Post.new uri_path - post.add_field('Content-Type', 'application/json') - version = 'output-logstash-scalyr 0.1.9' - post.add_field('User-Agent', version + ';' + RUBY_VERSION + ';' + RUBY_PLATFORM) + version = 'output-logstash-scalyr 0.1.10.beta' + post_headers = { + 'Content-Type': 'application/json', + 'User-Agent': version + ';' + RUBY_VERSION + ';' + RUBY_PLATFORM + } + post_body = nil if not encoding.nil? - post.add_field('Content-Encoding', encoding) - post.body = compressed_body + post_headers['Content-Encoding'] = encoding + post_body = compressed_body else - post.body = body + post_body = body end - return post, compression_duration + return post_body, post_headers, compression_duration end # def prepare_post_object # Process responses from Scalyr, raising appropriate exceptions if needed