lib/amqp/client/connection.rb in amqp-client-1.1.1 vs lib/amqp/client/connection.rb in amqp-client-1.1.2
- old
+ new
@@ -131,13 +131,17 @@
def write_bytes(*bytes)
blocked = @blocked
warn "AMQP-Client blocked by broker: #{blocked}" if blocked
@write_lock.synchronize do
warn "AMQP-Client unblocked by broker" if blocked
- @socket.write(*bytes)
+ if RUBY_ENGINE == "truffleruby"
+ bytes.each { |b| @socket.write b }
+ else
+ @socket.write(*bytes)
+ end
end
- rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ rescue *READ_EXCEPTIONS => e
raise Error::ConnectionClosed.new(*@closed) if @closed
raise Error, "Could not write to socket, #{e.message}"
end
@@ -150,42 +154,45 @@
socket = @socket
frame_max = @frame_max
frame_start = String.new(capacity: 7)
frame_buffer = String.new(capacity: frame_max)
loop do
- socket.read(7, frame_start)
+ socket.read(7, frame_start) || raise(IOError)
type, channel_id, frame_size = frame_start.unpack("C S> L>")
frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")
# read the frame content
- socket.read(frame_size, frame_buffer)
+ socket.read(frame_size, frame_buffer) || raise(IOError)
# make sure that the frame end is correct
frame_end = socket.readchar.ord
raise UnexpectedFrameEnd, frame_end if frame_end != 206
# parse the frame, will return false if a close frame was received
parse_frame(type, channel_id, frame_buffer) || return
end
nil
- rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ rescue *READ_EXCEPTIONS => e
@closed ||= [400, "read error: #{e.message}"]
nil # ignore read errors
ensure
@closed ||= [400, "unknown"]
@replies.close
begin
@write_lock.synchronize do
@socket.close
end
- rescue IOError, OpenSSL::OpenSSLError, SystemCallError
+ rescue *READ_EXCEPTIONS
nil
end
end
private
+ READ_EXCEPTIONS = [IOError, OpenSSL::OpenSSLError, SystemCallError,
+ RUBY_ENGINE == "jruby" ? java.lang.NullPointerException : nil].compact.freeze
+
def parse_frame(type, channel_id, buf)
case type
when 1 # method frame
class_id, method_id = buf.unpack("S> S>")
case class_id
@@ -400,11 +407,11 @@
socket.write "AMQP\x00\x00\x09\x01"
buf = String.new(capacity: 4096)
loop do
begin
socket.readpartial(4096, buf)
- rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ rescue *READ_EXCEPTIONS => e
raise Error, "Could not establish AMQP connection: #{e.message}"
end
type, channel_id, frame_size = buf.unpack("C S> L>")
frame_end = buf.getbyte(frame_size + 7)
@@ -442,13 +449,13 @@
else raise Error, "Unexpected class/method: #{class_id} #{method_id}"
end
else raise Error, "Unexpected frame type: #{type}"
end
end
- rescue StandardError => e
+ rescue Exception => e
begin
socket.close
- rescue IOError, OpenSSL::OpenSSLError, SystemCallError
+ rescue *READ_EXCEPTIONS
nil
end
raise e
end