lib/onstomp/connections/base.rb in onstomp-1.0.3 vs lib/onstomp/connections/base.rb in onstomp-1.0.4

- old
+ new

@@ -3,10 +3,11 @@ # Common behavior for all connections. class OnStomp::Connections::Base include OnStomp::Interfaces::ConnectionEvents attr_reader :version, :socket, :client attr_reader :last_transmitted_at, :last_received_at + attr_accessor :write_timeout, :read_timeout # The approximate maximum number of bytes to write per call to # {#io_process_write}. MAX_BYTES_PER_WRITE = 1024 * 8 # The maximum number of bytes to read per call to {#io_process_read} @@ -24,10 +25,13 @@ @closing = false @write_buffer = [] @read_buffer = [] @client = client @connection_up = false + @write_timeout = nil + @read_timeout = nil + setup_non_blocking_methods end # Performs any necessary configuration of the connection from the CONNECTED # frame sent by the broker and a `Hash` of pending callbacks. This method # is called after the protocol negotiation has taken place between client @@ -69,13 +73,14 @@ write_frame_nonblock connect_frame(*headers) client_con = nil until client_con io_process_write { |f| client_con ||= f } end + @last_received_at = Time.now broker_con = nil until broker_con - io_process_read { |f| broker_con ||= f } + io_process_read(true) { |f| broker_con ||= f } end raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED' vers = broker_con.header?(:version) ? broker_con[:version] : '1.0' raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers) @connection_up = true @@ -91,10 +96,24 @@ else super end end + # Number of milliseconds since data was last transmitted to the broker or + # `nil` if no data has been transmitted when the method is called. + # @return [Fixnum, nil] + def duration_since_transmitted + last_transmitted_at && ((Time.now - last_transmitted_at)*1000).to_i + end + + # Number of milliseconds since data was last received from the broker or + # `nil` if no data has been received when the method is called. + # @return [Fixnum, nil] + def duration_since_received + last_received_at && ((Time.now - last_received_at)*1000).to_i + end + # Flushes the write buffer by invoking {#io_process_write} until the # buffer is empty. def flush_write_buffer io_process_write until @write_buffer.empty? end @@ -120,10 +139,11 @@ # Adds data and frame pair to the end of the write buffer # @param [String] data # @param [OnStomp::Components::Frame] def push_write_buffer data, frame @write_mutex.synchronize { + @last_write_activity = Time.now if @write_buffer.empty? @write_buffer << [data, frame] unless @closing } end # Removes the first data and frame pair from the write buffer # @param [String] data @@ -144,77 +164,163 @@ # to notify the client that the frame has been sent to the broker. If a # complete frame cannot be written without blocking, the unsent data is # sent to the head of the write buffer to be processed first the next time # this method is invoked. def io_process_write - begin - if @write_buffer.length > 0 && IO.select(nil, [socket], nil, 0.1) - to_shift = @write_buffer.length / 3 - written = 0 - while written < MAX_BYTES_PER_WRITE - data, frame = shift_write_buffer - break unless data && connected? - begin - w = socket.write_nonblock(data) - rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK - # writing will either block, or cannot otherwise be completed, - # put data back and try again some other day - unshift_write_buffer data, frame - break - end - written += w - @last_transmitted_at = Time.now - if w < data.length - unshift_write_buffer data[w..-1], frame - else - yield frame if block_given? - client.dispatch_transmitted frame - end + if ready_for_write? + to_shift = @write_buffer.length / 3 + written = 0 + while written < MAX_BYTES_PER_WRITE + data, frame = shift_write_buffer + break unless data && connected? + begin + w = write_nonblock data + rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK + # writing will either block, or cannot otherwise be completed, + # put data back and try again some other day + unshift_write_buffer data, frame + break + rescue Exception + triggered_close $!.message, :terminated + raise end + written += w + @last_write_activity = @last_transmitted_at = Time.now + if w < data.length + unshift_write_buffer data[w..-1], frame + else + yield frame if block_given? + client.dispatch_transmitted frame + end end - rescue Exception - triggered_close $!.message, :terminated - raise + elsif write_timeout_exceeded? + triggered_close 'write blocked', :blocked end if @write_buffer.empty? && @closing triggered_close 'client disconnected' end end # Reads serialized frame data from the socket if we're connected and # and the socket is ready for reading. The received data will be pushed # to the end of a read buffer, which is then sent to the connection's # {OnStomp::Connections::Serializers serializer} for processing. - def io_process_read - begin - if connected? && IO.select([socket], nil, nil, 0.1) - if data = socket.read_nonblock(MAX_BYTES_PER_READ) + def io_process_read(check_timeout=false) + if ready_for_read? + begin + if data = read_nonblock @read_buffer << data @last_received_at = Time.now serializer.bytes_to_frame(@read_buffer) do |frame| yield frame if block_given? client.dispatch_received frame end - else - triggered_close $!.message, :terminated end + rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK + # do not + rescue EOFError + triggered_close $!.message + rescue Exception + triggered_close $!.message, :terminated + raise end - rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK - # do not - rescue EOFError - triggered_close $!.message + elsif check_timeout && read_timeout_exceeded? + triggered_close 'read blocked', :blocked + end + end + + private + def duration_since_write_activity + Time.now - @last_write_activity + end + + # Returns true if the connection has buffered data to write and the + # socket is ready to be written to. If checking the socket's state raises + # an exception, the connection will be closed (triggering an + # `on_terminated` event) and the error will be re-raised. + def ready_for_write? + begin + @write_buffer.length > 0 && IO.select(nil, [socket], nil, 0.1) rescue Exception triggered_close $!.message, :terminated raise end end - private + # Returns true if the connection has buffered data to write and the + # socket is ready to be written to. If checking the socket's state raises + # an exception, the connection will be closed (triggering an + # `on_terminated` event) and the error will be re-raised. + def ready_for_read? + begin + connected? && IO.select([socket], nil, nil, 0.1) + rescue Exception + triggered_close $!.message, :terminated + raise + end + end + + # Returns true if a `write_timeout` has been set, the connection has buffered + # data to write, and `duration_since_transmitted` is greater than + # `write_timeout` + def write_timeout_exceeded? + @write_timeout && @write_buffer.length > 0 && + duration_since_write_activity > @write_timeout + end + + # Returns true if a `read_timeout` has been set and + # `duration_since_received` is greater than `read_timeout` + # This is only used when establishing the connection through the CONNECT/ + # CONNECTED handshake. After that, it is up to heart-beating. + def read_timeout_exceeded? + @read_timeout && duration_since_received > (@read_timeout*1000) + end + def triggered_close msg, *evs @connection_up = false @closing = false + # unless socket.closed? + # socket.to_io.shutdown(2) rescue nil + # + # end socket.close rescue nil evs.each { |ev| trigger_connection_event ev, msg } trigger_connection_event :closed, msg @write_buffer.clear + end + + # OpenSSL sockets in Ruby 1.8.7 and JRuby (as of jruby-openssl 0.7.3) + # do NOT support non-blocking IO natively. Such a hack, and such a huge + # oversight on my part. We define some methods on this instance to use + # the right read/write operations. Fortunately, this gets done at + # initialization and only has to happen once. + def setup_non_blocking_methods + read_mod = @socket.respond_to?(:read_nonblock) ? NonblockingRead : + BlockingRead + write_mod = @socket.respond_to?(:write_nonblock) ? NonblockingWrite : + BlockingWrite + self.extend read_mod + self.extend write_mod + end + + module NonblockingRead + def read_nonblock + socket.read_nonblock MAX_BYTES_PER_READ + end + end + module NonblockingWrite + def write_nonblock data + socket.write_nonblock data + end + end + + module BlockingRead + def read_nonblock + socket.readpartial MAX_BYTES_PER_READ + end + end + module BlockingWrite + def write_nonblock data + socket.write data + end end end