lib/connection/utils.rb in stomp-1.2.5 vs lib/connection/utils.rb in stomp-1.2.6
- old
+ new
@@ -77,11 +77,11 @@
# Heartbeats - pre connect
return unless @connect_headers[:"heart-beat"]
_validate_hbheader()
end
- # _post_connect handles low level logic just post a physical connect.
+ # _post_connect handles low level logic just after a physical connect.
def _post_connect()
return unless (@connect_headers[:"accept-version"] && @connect_headers[:host])
return if @connection_frame.command == Stomp::CMD_ERROR
# We are CONNECTed
cfh = @connection_frame.headers.symbolize_keys
@@ -96,9 +96,138 @@
# Heartbeats
return unless @connect_headers[:"heart-beat"]
_init_heartbeats()
end
- end # class
+ # socket creates and returns a new socket for use by the connection.
+ def socket()
+ @socket_semaphore.synchronize do
+ used_socket = @socket
+ used_socket = nil if closed?
-end # module
+ while used_socket.nil? || !@failure.nil?
+ @failure = nil
+ begin
+ used_socket = open_socket()
+ # Open is complete
+ connect(used_socket)
+ if @logger && @logger.respond_to?(:on_connected)
+ @logger.on_connected(log_params)
+ end
+ @connection_attempts = 0
+ rescue
+ @failure = $!
+ used_socket = nil
+ raise unless @reliable
+ raise if @failure.is_a?(Stomp::Error::LoggerConnectionError)
+ @closed = true
+ if @logger && @logger.respond_to?(:on_connectfail)
+ # on_connectfail may raise
+ begin
+ @logger.on_connectfail(log_params)
+ rescue Exception => aex
+ raise if aex.is_a?(Stomp::Error::LoggerConnectionError)
+ end
+ else
+ $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
+ end
+ raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?
+
+ sleep(@reconnect_delay)
+
+ @connection_attempts += 1
+
+ if @parameters
+ change_host()
+ increase_reconnect_delay()
+ end
+ end
+ end
+ @socket = used_socket
+ end
+ end
+
+ # refine_params sets up defaults for a Hash initialize.
+ def refine_params(params)
+ params = params.uncamelize_and_symbolize_keys
+ default_params = {
+ :connect_headers => {},
+ :reliable => true,
+ # Failover parameters
+ :initial_reconnect_delay => 0.01,
+ :max_reconnect_delay => 30.0,
+ :use_exponential_back_off => true,
+ :back_off_multiplier => 2,
+ :max_reconnect_attempts => 0,
+ :randomize => false,
+ :connect_timeout => 0,
+ # Parse Timeout
+ :parse_timeout => 5,
+ :dmh => false,
+ # Closed check logic
+ :closed_check => true,
+ :hbser => false,
+ }
+
+ res_params = default_params.merge(params)
+ if res_params[:dmh]
+ res_params = _expand_hosts(res_params)
+ end
+ return res_params
+ end
+
+ # change_host selects the next host for retires.
+ def change_host
+ @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
+
+ # Set first as master and send it to the end of array
+ current_host = @parameters[:hosts].shift
+ @parameters[:hosts] << current_host
+
+ @ssl = current_host[:ssl]
+ @host = current_host[:host]
+ @port = current_host[:port] || Connection::default_port(@ssl)
+ @login = current_host[:login] || ""
+ @passcode = current_host[:passcode] || ""
+
+ end
+
+ # max_reconnect_attempts? returns nil or the number of maximum reconnect
+ # attempts.
+ def max_reconnect_attempts?
+ !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
+ end
+
+ # increase_reconnect_delay increases the reconnect delay for the next connection
+ # attempt.
+ def increase_reconnect_delay
+
+ @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
+ @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
+
+ @reconnect_delay
+ end
+
+ # __old_receive receives a frame, blocks until the frame is received.
+ def __old_receive()
+ # The receive may fail so we may need to retry.
+ while TRUE
+ begin
+ used_socket = socket
+ return _receive(used_socket)
+ rescue
+ @failure = $!
+ raise unless @reliable
+ errstr = "receive failed: #{$!}"
+ if @logger && @logger.respond_to?(:on_miscerr)
+ @logger.on_miscerr(log_params, errstr)
+ else
+ $stderr.print errstr
+ end
+ end
+ end
+ end
+
+ end # class Connection
+
+end # module Stomp