lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.9 vs lib/scalyr/common/client.rb in logstash-output-scalyr-0.1.10.beta
- old
+ new
@@ -50,22 +50,27 @@
# create a new connection for every post or use a persistent connection)
#---------------------------------------------------------------------------------------------------------------------
class ClientSession
def initialize(logger, add_events_uri, compression_type, compression_level,
- ssl_verify_peer, ssl_ca_bundle_path, ssl_verify_depth, append_builtin_cert,
- record_stats_for_status, flush_quantile_estimates_on_status_send)
+ ssl_verify_peer, ssl_ca_bundle_path, append_builtin_cert,
+ record_stats_for_status, flush_quantile_estimates_on_status_send,
+ connect_timeout, socket_timeout, request_timeout, pool_max, pool_max_per_route)
@logger = logger
@add_events_uri = add_events_uri # typically /addEvents
@compression_type = compression_type
@compression_level = compression_level
@ssl_verify_peer = ssl_verify_peer
@ssl_ca_bundle_path = ssl_ca_bundle_path
@append_builtin_cert = append_builtin_cert
- @ssl_verify_depth = ssl_verify_depth
@record_stats_for_status = record_stats_for_status
@flush_quantile_estimates_on_status_send = flush_quantile_estimates_on_status_send
+ @connect_timeout = connect_timeout
+ @socket_timeout = socket_timeout
+ @request_timeout = request_timeout
+ @pool_max = pool_max
+ @pool_max_per_route = pool_max_per_route
# A cert to use by default to avoid issues caused by the OpenSSL library not validating certs according to standard
@cert_string = "" \
"-----BEGIN CERTIFICATE-----\n" \
"MIIG6zCCBNOgAwIBAgIJAM5aknNWtN6oMA0GCSqGSIb3DQEBCwUAMIGpMQswCQYD\n" \
@@ -124,15 +129,33 @@
:total_compression_duration_secs => 0, # The total duration (in seconds) it took to compress all the request bodies.
# You can calculate avg compression duration by diving this value with total_requests_sent
:compression_type => @compression_type,
:compression_level => @compression_level,
}
+ end # def initialize
- @http = Net::HTTP::Persistent.new
+ def client_config
+ # TODO: Eventually expose some more of these as config options, though nothing here really needs tuning normally
+ # besides SSL
+ c = {
+ connect_timeout: @connect_timeout,
+ socket_timeout: @socket_timeout,
+ request_timeout: @request_timeout,
+ follow_redirects: true,
+ automatic_retries: 1,
+ retry_non_idempotent: false,
+ check_connection_timeout: 200,
+ pool_max: @pool_max,
+ pool_max_per_route: @pool_max_per_route,
+ cookies: true,
+ keepalive: true,
+ ssl: {}
+ }
# verify peers to prevent potential MITM attacks
if @ssl_verify_peer
+ c[:ssl][:verify] = :strict
@ca_cert = Tempfile.new("ca_cert")
if File.file?(@ssl_ca_bundle_path)
@ca_cert.write(File.read(@ssl_ca_bundle_path))
@ca_cert.flush
end
@@ -140,18 +163,22 @@
open(@ca_cert.path, 'a') do |f|
f.puts @cert_string
end
end
@ca_cert.flush
- @http.ca_file = @ca_cert.path
- @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
- @http.verify_depth = @ssl_verify_depth
+ c[:ssl][:ca_file] = @ca_cert.path
else
- @http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ c[:ssl][:verify] = :disable
end
- end # def initialize
+ c
+ end
+
+ def client
+ @client ||= Manticore::Client.new(client_config)
+ end
+
# Convenience method to create a fresh quantile estimator
def get_new_latency_stats
return {
# The total number of HTTP connections successfully created.
:serialization_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to serialize (JSON dumos) all the request bodies.
@@ -161,64 +188,60 @@
}
end
# Get a clone of current statistics hash and calculate percentiles
def get_stats
- current_stats = @stats.clone
+ @stats_lock.synchronize do
+ current_stats = @stats.clone
- current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5)
- current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9)
- current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99)
- current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5)
- current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9)
- current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99)
- current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5)
- current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9)
- current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99)
- current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5)
- current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9)
- current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99)
+ current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5)
+ current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9)
+ current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99)
+ current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5)
+ current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9)
+ current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99)
+ current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5)
+ current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9)
+ current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99)
+ current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5)
+ current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9)
+ current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99)
- if @flush_quantile_estimates_on_status_send
- @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics"
- @latency_stats = get_new_latency_stats
+ if @flush_quantile_estimates_on_status_send
+ @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics"
+ @latency_stats = get_new_latency_stats
+ end
+ current_stats
end
- current_stats
end
# Upload data to Scalyr. Assumes that the body size complies with Scalyr limits
def post_add_events(body, is_status, body_serialization_duration = 0)
- post, compression_duration = prepare_post_object @add_events_uri.path, body
+ post_body, post_headers, compression_duration = prepare_post_object @add_events_uri.path, body
fail_count = 1 # putative assume failure
start_time = Time.now
uncompressed_bytes_sent = 0
compressed_bytes_sent = 0
bytes_received = 0
begin
- response = @http.request(@add_events_uri, post)
+ response = client.send(:post, @add_events_uri, body: post_body, headers: post_headers)
handle_response(response)
fail_count -= 1 # success means we negate the putative failure
uncompressed_bytes_sent = (body.bytesize + @add_events_uri.path.bytesize)
- compressed_bytes_sent = (post.body.bytesize + @add_events_uri.path.bytesize)
+ compressed_bytes_sent = (post_body.bytesize + @add_events_uri.path.bytesize)
bytes_received = response.body.bytesize # echee: double check
# echee TODO add more statistics
+ # TODO: Manticore doesn't raise SSL errors as this but as "UnknownExceptions", need to dig in and see if there is a
+ # way to detect that it is from SSL.
rescue OpenSSL::SSL::SSLError => e
- if @ssl_verify_peer and @ssl_ca_bundle_path.nil? and !File.file?(@ca_cert.path)
- @ca_cert = Tempfile.new("ca_cert")
- @ca_cert.write(@cert_string)
- @ca_cert.flush
- @http.ca_file = @ca_cert.path
- raise ClientError.new("Packaged certificate appears to have been deleted, writing a new one.", @add_events_uri)
- else
- raise e
- end
+ raise e
- rescue Net::HTTP::Persistent::Error => e
+ rescue Manticore::ManticoreException => e
# The underlying persistent-connection library automatically retries when there are network-related errors.
# Eventually, it will give up and raise this generic error, at which time, we convert it to a ClientError
raise ClientError.new(e.message, @add_events_uri)
ensure
@@ -243,11 +266,10 @@
end # def post_request
def close
- @http.shutdown
end # def close
# Prepare a post object to be sent, compressing it if necessary
@@ -271,21 +293,23 @@
end
end_time = Time.now.to_f
compression_duration = end_time - start_time
end
- post = Net::HTTP::Post.new uri_path
- post.add_field('Content-Type', 'application/json')
- version = 'output-logstash-scalyr 0.1.9'
- post.add_field('User-Agent', version + ';' + RUBY_VERSION + ';' + RUBY_PLATFORM)
+ version = 'output-logstash-scalyr 0.1.10.beta'
+ post_headers = {
+ 'Content-Type': 'application/json',
+ 'User-Agent': version + ';' + RUBY_VERSION + ';' + RUBY_PLATFORM
+ }
+ post_body = nil
if not encoding.nil?
- post.add_field('Content-Encoding', encoding)
- post.body = compressed_body
+ post_headers['Content-Encoding'] = encoding
+ post_body = compressed_body
else
- post.body = body
+ post_body = body
end
- return post, compression_duration
+ return post_body, post_headers, compression_duration
end # def prepare_post_object
# Process responses from Scalyr, raising appropriate exceptions if needed