lib/connection/utils.rb in stomp-1.2.5 vs lib/connection/utils.rb in stomp-1.2.6

- old
+ new

@@ -77,11 +77,11 @@ # Heartbeats - pre connect return unless @connect_headers[:"heart-beat"] _validate_hbheader() end - # _post_connect handles low level logic just post a physical connect. + # _post_connect handles low level logic just after a physical connect. def _post_connect() return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) return if @connection_frame.command == Stomp::CMD_ERROR # We are CONNECTed cfh = @connection_frame.headers.symbolize_keys @@ -96,9 +96,138 @@ # Heartbeats return unless @connect_headers[:"heart-beat"] _init_heartbeats() end - end # class + # socket creates and returns a new socket for use by the connection. + def socket() + @socket_semaphore.synchronize do + used_socket = @socket + used_socket = nil if closed? -end # module + while used_socket.nil? || !@failure.nil? + @failure = nil + begin + used_socket = open_socket() + # Open is complete + connect(used_socket) + if @logger && @logger.respond_to?(:on_connected) + @logger.on_connected(log_params) + end + @connection_attempts = 0 + rescue + @failure = $! + used_socket = nil + raise unless @reliable + raise if @failure.is_a?(Stomp::Error::LoggerConnectionError) + @closed = true + if @logger && @logger.respond_to?(:on_connectfail) + # on_connectfail may raise + begin + @logger.on_connectfail(log_params) + rescue Exception => aex + raise if aex.is_a?(Stomp::Error::LoggerConnectionError) + end + else + $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n" + end + raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts? + + sleep(@reconnect_delay) + + @connection_attempts += 1 + + if @parameters + change_host() + increase_reconnect_delay() + end + end + end + @socket = used_socket + end + end + + # refine_params sets up defaults for a Hash initialize. + def refine_params(params) + params = params.uncamelize_and_symbolize_keys + default_params = { + :connect_headers => {}, + :reliable => true, + # Failover parameters + :initial_reconnect_delay => 0.01, + :max_reconnect_delay => 30.0, + :use_exponential_back_off => true, + :back_off_multiplier => 2, + :max_reconnect_attempts => 0, + :randomize => false, + :connect_timeout => 0, + # Parse Timeout + :parse_timeout => 5, + :dmh => false, + # Closed check logic + :closed_check => true, + :hbser => false, + } + + res_params = default_params.merge(params) + if res_params[:dmh] + res_params = _expand_hosts(res_params) + end + return res_params + end + + # change_host selects the next host for retires. + def change_host + @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize] + + # Set first as master and send it to the end of array + current_host = @parameters[:hosts].shift + @parameters[:hosts] << current_host + + @ssl = current_host[:ssl] + @host = current_host[:host] + @port = current_host[:port] || Connection::default_port(@ssl) + @login = current_host[:login] || "" + @passcode = current_host[:passcode] || "" + + end + + # max_reconnect_attempts? returns nil or the number of maximum reconnect + # attempts. + def max_reconnect_attempts? + !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts] + end + + # increase_reconnect_delay increases the reconnect delay for the next connection + # attempt. + def increase_reconnect_delay + + @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] + @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay] + + @reconnect_delay + end + + # __old_receive receives a frame, blocks until the frame is received. + def __old_receive() + # The receive may fail so we may need to retry. + while TRUE + begin + used_socket = socket + return _receive(used_socket) + rescue + @failure = $! + raise unless @reliable + errstr = "receive failed: #{$!}" + if @logger && @logger.respond_to?(:on_miscerr) + @logger.on_miscerr(log_params, errstr) + else + $stderr.print errstr + end + end + end + end + + end # class Connection + +end # module Stomp