lib/amqp/client/connection.rb in amqp-client-1.1.1 vs lib/amqp/client/connection.rb in amqp-client-1.1.2

- old
+ new

@@ -131,13 +131,17 @@ def write_bytes(*bytes) blocked = @blocked warn "AMQP-Client blocked by broker: #{blocked}" if blocked @write_lock.synchronize do warn "AMQP-Client unblocked by broker" if blocked - @socket.write(*bytes) + if RUBY_ENGINE == "truffleruby" + bytes.each { |b| @socket.write b } + else + @socket.write(*bytes) + end end - rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + rescue *READ_EXCEPTIONS => e raise Error::ConnectionClosed.new(*@closed) if @closed raise Error, "Could not write to socket, #{e.message}" end @@ -150,42 +154,45 @@ socket = @socket frame_max = @frame_max frame_start = String.new(capacity: 7) frame_buffer = String.new(capacity: frame_max) loop do - socket.read(7, frame_start) + socket.read(7, frame_start) || raise(IOError) type, channel_id, frame_size = frame_start.unpack("C S> L>") frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}") # read the frame content - socket.read(frame_size, frame_buffer) + socket.read(frame_size, frame_buffer) || raise(IOError) # make sure that the frame end is correct frame_end = socket.readchar.ord raise UnexpectedFrameEnd, frame_end if frame_end != 206 # parse the frame, will return false if a close frame was received parse_frame(type, channel_id, frame_buffer) || return end nil - rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + rescue *READ_EXCEPTIONS => e @closed ||= [400, "read error: #{e.message}"] nil # ignore read errors ensure @closed ||= [400, "unknown"] @replies.close begin @write_lock.synchronize do @socket.close end - rescue IOError, OpenSSL::OpenSSLError, SystemCallError + rescue *READ_EXCEPTIONS nil end end private + READ_EXCEPTIONS = [IOError, OpenSSL::OpenSSLError, SystemCallError, + RUBY_ENGINE == "jruby" ? java.lang.NullPointerException : nil].compact.freeze + def parse_frame(type, channel_id, buf) case type when 1 # method frame class_id, method_id = buf.unpack("S> S>") case class_id @@ -400,11 +407,11 @@ socket.write "AMQP\x00\x00\x09\x01" buf = String.new(capacity: 4096) loop do begin socket.readpartial(4096, buf) - rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + rescue *READ_EXCEPTIONS => e raise Error, "Could not establish AMQP connection: #{e.message}" end type, channel_id, frame_size = buf.unpack("C S> L>") frame_end = buf.getbyte(frame_size + 7) @@ -442,13 +449,13 @@ else raise Error, "Unexpected class/method: #{class_id} #{method_id}" end else raise Error, "Unexpected frame type: #{type}" end end - rescue StandardError => e + rescue Exception => e begin socket.close - rescue IOError, OpenSSL::OpenSSLError, SystemCallError + rescue *READ_EXCEPTIONS nil end raise e end