lib/amqp/client/connection.rb in amqp-client-1.1.2 vs lib/amqp/client/connection.rb in amqp-client-1.1.3
- old
+ new
@@ -22,10 +22,11 @@
# @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead
# @option options [Integer] frame_max (131_072) Maximum frame size,
# the smallest of the client's and the broker's values will be used
# @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open.
# Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used.
+ # @option options [String] keepalive (60:10:3) TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes
# @return [Connection]
def initialize(uri = "", read_loop_thread: true, **options)
uri = URI.parse(uri)
tls = uri.scheme == "amqps"
port = port_from_env || uri.port || (tls ? 5671 : 5672)
@@ -381,11 +382,12 @@
# @return [Socket]
# @return [OpenSSL::SSL::SSLSocket]
def open_socket(host, port, tls, options)
connect_timeout = options.fetch(:connect_timeout, 30).to_i
socket = Socket.tcp host, port, connect_timeout: connect_timeout
- enable_tcp_keepalive(socket)
+ keepalive = options.fetch(:keepalive, "").split(":", 3).map!(&:to_i)
+ enable_tcp_keepalive(socket, *keepalive)
if tls
cert_store = OpenSSL::X509::Store.new
cert_store.set_default_paths
context = OpenSSL::SSL::SSLContext.new
context.cert_store = cert_store
@@ -396,10 +398,12 @@
socket.hostname = host # SNI host
socket.connect
socket.post_connection_check(host) || raise(Error, "TLS certificate hostname doesn't match requested")
end
socket
+ rescue SystemCallError, OpenSSL::OpenSSLError => e
+ raise Error, "Could not open a socket: #{e.message}"
end
# Negotiate a connection
# @return [Array<Integer, Integer, Integer>] channel_max, frame_max, heartbeat
def establish(socket, user, password, vhost, options)
@@ -460,14 +464,21 @@
raise e
end
# Enable TCP keepalive, which is prefered to heartbeats
# @return [void]
- def enable_tcp_keepalive(socket)
+ def enable_tcp_keepalive(socket, idle = 60, interval = 10, count = 3)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
- socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60)
- socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10)
- socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)
+ if Socket.const_defined?(:TCP_KEEPIDLE) # linux/bsd
+ socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, idle)
+ elsif RUBY_PLATFORM.include? "darwin" # os x
+ # https://www.quickhack.net/nom/blog/2018-01-19-enable-tcp-keepalive-of-macos-and-linux-in-ruby.html
+ socket.setsockopt(Socket::IPPROTO_TCP, 0x10, idle)
+ else # windows
+ return
+ end
+ socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, interval)
+ socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, count)
rescue StandardError => e
warn "AMQP-Client could not enable TCP keepalive on socket. #{e.inspect}"
end
# Fetch the AMQP port number from ENV