lib/amqp/client/connection.rb in amqp-client-1.1.2 vs lib/amqp/client/connection.rb in amqp-client-1.1.3

- old
+ new

@@ -22,10 +22,11 @@ # @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead # @option options [Integer] frame_max (131_072) Maximum frame size, # the smallest of the client's and the broker's values will be used # @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open. # Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used. + # @option options [String] keepalive (60:10:3) TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes # @return [Connection] def initialize(uri = "", read_loop_thread: true, **options) uri = URI.parse(uri) tls = uri.scheme == "amqps" port = port_from_env || uri.port || (tls ? 5671 : 5672) @@ -381,11 +382,12 @@ # @return [Socket] # @return [OpenSSL::SSL::SSLSocket] def open_socket(host, port, tls, options) connect_timeout = options.fetch(:connect_timeout, 30).to_i socket = Socket.tcp host, port, connect_timeout: connect_timeout - enable_tcp_keepalive(socket) + keepalive = options.fetch(:keepalive, "").split(":", 3).map!(&:to_i) + enable_tcp_keepalive(socket, *keepalive) if tls cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths context = OpenSSL::SSL::SSLContext.new context.cert_store = cert_store @@ -396,10 +398,12 @@ socket.hostname = host # SNI host socket.connect socket.post_connection_check(host) || raise(Error, "TLS certificate hostname doesn't match requested") end socket + rescue SystemCallError, OpenSSL::OpenSSLError => e + raise Error, "Could not open a socket: #{e.message}" end # Negotiate a connection # @return [Array<Integer, Integer, Integer>] channel_max, frame_max, heartbeat def establish(socket, user, password, vhost, options) @@ -460,14 +464,21 @@ raise e end # Enable TCP keepalive, which is prefered to heartbeats # @return [void] - def enable_tcp_keepalive(socket) + def enable_tcp_keepalive(socket, idle = 60, interval = 10, count = 3) socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3) + if Socket.const_defined?(:TCP_KEEPIDLE) # linux/bsd + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, idle) + elsif RUBY_PLATFORM.include? "darwin" # os x + # https://www.quickhack.net/nom/blog/2018-01-19-enable-tcp-keepalive-of-macos-and-linux-in-ruby.html + socket.setsockopt(Socket::IPPROTO_TCP, 0x10, idle) + else # windows + return + end + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, interval) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, count) rescue StandardError => e warn "AMQP-Client could not enable TCP keepalive on socket. #{e.inspect}" end # Fetch the AMQP port number from ENV