lib/onstomp/connections/base.rb in onstomp-1.0.3 vs lib/onstomp/connections/base.rb in onstomp-1.0.4
- old
+ new
@@ -3,10 +3,11 @@
# Common behavior for all connections.
class OnStomp::Connections::Base
include OnStomp::Interfaces::ConnectionEvents
attr_reader :version, :socket, :client
attr_reader :last_transmitted_at, :last_received_at
+ attr_accessor :write_timeout, :read_timeout
# The approximate maximum number of bytes to write per call to
# {#io_process_write}.
MAX_BYTES_PER_WRITE = 1024 * 8
# The maximum number of bytes to read per call to {#io_process_read}
@@ -24,10 +25,13 @@
@closing = false
@write_buffer = []
@read_buffer = []
@client = client
@connection_up = false
+ @write_timeout = nil
+ @read_timeout = nil
+ setup_non_blocking_methods
end
# Performs any necessary configuration of the connection from the CONNECTED
# frame sent by the broker and a `Hash` of pending callbacks. This method
# is called after the protocol negotiation has taken place between client
@@ -69,13 +73,14 @@
write_frame_nonblock connect_frame(*headers)
client_con = nil
until client_con
io_process_write { |f| client_con ||= f }
end
+ @last_received_at = Time.now
broker_con = nil
until broker_con
- io_process_read { |f| broker_con ||= f }
+ io_process_read(true) { |f| broker_con ||= f }
end
raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED'
vers = broker_con.header?(:version) ? broker_con[:version] : '1.0'
raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers)
@connection_up = true
@@ -91,10 +96,24 @@
else
super
end
end
+ # Number of milliseconds since data was last transmitted to the broker or
+ # `nil` if no data has been transmitted when the method is called.
+ # @return [Fixnum, nil]
+ def duration_since_transmitted
+ last_transmitted_at && ((Time.now - last_transmitted_at)*1000).to_i
+ end
+
+ # Number of milliseconds since data was last received from the broker or
+ # `nil` if no data has been received when the method is called.
+ # @return [Fixnum, nil]
+ def duration_since_received
+ last_received_at && ((Time.now - last_received_at)*1000).to_i
+ end
+
# Flushes the write buffer by invoking {#io_process_write} until the
# buffer is empty.
def flush_write_buffer
io_process_write until @write_buffer.empty?
end
@@ -120,10 +139,11 @@
# Adds data and frame pair to the end of the write buffer
# @param [String] data
# @param [OnStomp::Components::Frame]
def push_write_buffer data, frame
@write_mutex.synchronize {
+ @last_write_activity = Time.now if @write_buffer.empty?
@write_buffer << [data, frame] unless @closing
}
end
# Removes the first data and frame pair from the write buffer
# @param [String] data
@@ -144,77 +164,163 @@
# to notify the client that the frame has been sent to the broker. If a
# complete frame cannot be written without blocking, the unsent data is
# sent to the head of the write buffer to be processed first the next time
# this method is invoked.
def io_process_write
- begin
- if @write_buffer.length > 0 && IO.select(nil, [socket], nil, 0.1)
- to_shift = @write_buffer.length / 3
- written = 0
- while written < MAX_BYTES_PER_WRITE
- data, frame = shift_write_buffer
- break unless data && connected?
- begin
- w = socket.write_nonblock(data)
- rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
- # writing will either block, or cannot otherwise be completed,
- # put data back and try again some other day
- unshift_write_buffer data, frame
- break
- end
- written += w
- @last_transmitted_at = Time.now
- if w < data.length
- unshift_write_buffer data[w..-1], frame
- else
- yield frame if block_given?
- client.dispatch_transmitted frame
- end
+ if ready_for_write?
+ to_shift = @write_buffer.length / 3
+ written = 0
+ while written < MAX_BYTES_PER_WRITE
+ data, frame = shift_write_buffer
+ break unless data && connected?
+ begin
+ w = write_nonblock data
+ rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
+ # writing will either block, or cannot otherwise be completed,
+ # put data back and try again some other day
+ unshift_write_buffer data, frame
+ break
+ rescue Exception
+ triggered_close $!.message, :terminated
+ raise
end
+ written += w
+ @last_write_activity = @last_transmitted_at = Time.now
+ if w < data.length
+ unshift_write_buffer data[w..-1], frame
+ else
+ yield frame if block_given?
+ client.dispatch_transmitted frame
+ end
end
- rescue Exception
- triggered_close $!.message, :terminated
- raise
+ elsif write_timeout_exceeded?
+ triggered_close 'write blocked', :blocked
end
if @write_buffer.empty? && @closing
triggered_close 'client disconnected'
end
end
# Reads serialized frame data from the socket if we're connected and
# and the socket is ready for reading. The received data will be pushed
# to the end of a read buffer, which is then sent to the connection's
# {OnStomp::Connections::Serializers serializer} for processing.
- def io_process_read
- begin
- if connected? && IO.select([socket], nil, nil, 0.1)
- if data = socket.read_nonblock(MAX_BYTES_PER_READ)
+ def io_process_read(check_timeout=false)
+ if ready_for_read?
+ begin
+ if data = read_nonblock
@read_buffer << data
@last_received_at = Time.now
serializer.bytes_to_frame(@read_buffer) do |frame|
yield frame if block_given?
client.dispatch_received frame
end
- else
- triggered_close $!.message, :terminated
end
+ rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
+ # do not
+ rescue EOFError
+ triggered_close $!.message
+ rescue Exception
+ triggered_close $!.message, :terminated
+ raise
end
- rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
- # do not
- rescue EOFError
- triggered_close $!.message
+ elsif check_timeout && read_timeout_exceeded?
+ triggered_close 'read blocked', :blocked
+ end
+ end
+
+ private
+ def duration_since_write_activity
+ Time.now - @last_write_activity
+ end
+
+ # Returns true if the connection has buffered data to write and the
+ # socket is ready to be written to. If checking the socket's state raises
+ # an exception, the connection will be closed (triggering an
+ # `on_terminated` event) and the error will be re-raised.
+ def ready_for_write?
+ begin
+ @write_buffer.length > 0 && IO.select(nil, [socket], nil, 0.1)
rescue Exception
triggered_close $!.message, :terminated
raise
end
end
- private
+ # Returns true if the connection has buffered data to write and the
+ # socket is ready to be written to. If checking the socket's state raises
+ # an exception, the connection will be closed (triggering an
+ # `on_terminated` event) and the error will be re-raised.
+ def ready_for_read?
+ begin
+ connected? && IO.select([socket], nil, nil, 0.1)
+ rescue Exception
+ triggered_close $!.message, :terminated
+ raise
+ end
+ end
+
+ # Returns true if a `write_timeout` has been set, the connection has buffered
+ # data to write, and `duration_since_transmitted` is greater than
+ # `write_timeout`
+ def write_timeout_exceeded?
+ @write_timeout && @write_buffer.length > 0 &&
+ duration_since_write_activity > @write_timeout
+ end
+
+ # Returns true if a `read_timeout` has been set and
+ # `duration_since_received` is greater than `read_timeout`
+ # This is only used when establishing the connection through the CONNECT/
+ # CONNECTED handshake. After that, it is up to heart-beating.
+ def read_timeout_exceeded?
+ @read_timeout && duration_since_received > (@read_timeout*1000)
+ end
+
def triggered_close msg, *evs
@connection_up = false
@closing = false
+ # unless socket.closed?
+ # socket.to_io.shutdown(2) rescue nil
+ #
+ # end
socket.close rescue nil
evs.each { |ev| trigger_connection_event ev, msg }
trigger_connection_event :closed, msg
@write_buffer.clear
+ end
+
+ # OpenSSL sockets in Ruby 1.8.7 and JRuby (as of jruby-openssl 0.7.3)
+ # do NOT support non-blocking IO natively. Such a hack, and such a huge
+ # oversight on my part. We define some methods on this instance to use
+ # the right read/write operations. Fortunately, this gets done at
+ # initialization and only has to happen once.
+ def setup_non_blocking_methods
+ read_mod = @socket.respond_to?(:read_nonblock) ? NonblockingRead :
+ BlockingRead
+ write_mod = @socket.respond_to?(:write_nonblock) ? NonblockingWrite :
+ BlockingWrite
+ self.extend read_mod
+ self.extend write_mod
+ end
+
+ module NonblockingRead
+ def read_nonblock
+ socket.read_nonblock MAX_BYTES_PER_READ
+ end
+ end
+ module NonblockingWrite
+ def write_nonblock data
+ socket.write_nonblock data
+ end
+ end
+
+ module BlockingRead
+ def read_nonblock
+ socket.readpartial MAX_BYTES_PER_READ
+ end
+ end
+ module BlockingWrite
+ def write_nonblock data
+ socket.write data
+ end
end
end