lib/em-synchrony/tcpsocket.rb in em-synchrony-1.0.1 vs lib/em-synchrony/tcpsocket.rb in em-synchrony-1.0.2
- old
+ new
@@ -5,103 +5,232 @@
alias_method :_old_new, :new
def new(*args)
if args.size == 1
_old_new(*args)
else
- socket = EventMachine::connect(*args[0..1], self)
- raise SocketError unless socket.sync(:in) # wait for connection
+ # In TCPSocket, new against an unknown hostname raises SocketError with
+ # a message "getaddrinfo: nodename nor servname provided, or not known".
+ # In EM, connect against an unknown hostname raises EM::ConnectionError
+ # with a message of "unable to resolve server address"
+ begin
+ socket = EventMachine::connect(*args[0..1], self)
+ rescue EventMachine::ConnectionError => e
+ raise SocketError, e.message
+ end
+ # In TCPSocket, new against a closed port raises Errno::ECONNREFUSED.
+ # In EM, connect against a closed port result in a call to unbind with
+ # a reason param of Errno::ECONNREFUSED as a class, not an instance.
+ unless socket.sync(:in) # wait for connection
+ raise socket.unbind_reason.new if socket.unbind_reason.is_a? Class
+ raise SocketError
+ end
socket
end
end
alias :open :new
end
def post_init
@in_buff, @out_buff = '', ''
- @in_req = @out_req = nil
+ @in_req = @out_req = @unbind_reason = @read_type = nil
+ @opening = true
+ @closed = @remote_closed = false
end
def closed?
- @in_req.nil? && @out_req.nil?
+ # In TCPSocket,
+ # closed? on a remotely closed socket, when we've not yet read EOF, returns false
+ # closed? on a remotely closed socket, when we've read EOF, returns false
+ # closed? on a socket after #close, returns true
+ # Therefore, we set @close to true when #close is called, but not when unbind is.
+ @closed
end
# direction must be one of :in or :out
def sync(direction)
req = self.instance_variable_set "@#{direction.to_s}_req", EventMachine::DefaultDeferrable.new
EventMachine::Synchrony.sync req
+ ensure
+ self.instance_variable_set "@#{direction.to_s}_req", nil
end
# TCPSocket interface
def setsockopt(level, name, value); end
- def send(msg, flags = 0)
+ def send(msg, flags)
raise "Unknown flags in send(): #{flags}" if flags.nonzero?
+ # write(X) on a socket after #close, raises IOError with message "closed stream"
+ # send(X,0) on a socket after #close, raises IOError with message "closed stream"
+ raise IOError, "closed stream" if @closed
+ # the first write(X) on a remotely closed socket, <= than some buffer size, generates no error
+ # the first write(X) on a remotely closed socket, > than some buffer size, generates no error
+ # (on my box this buffer appears to be 80KB)
+ # further write(X) on a remotely closed socket, raises Errno::EPIPE
+ # the first send(X,0) on a remotely closed socket, <= than some buffer size, generates no error
+ # the first send(X,0) on a remotely closed socket, > than some buffer size, generates no error
+ # (on my box this buffer appears to be 80KB)
+ # further send(X,0) on a remotely closed socket, raises Errno::EPIPE
+ raise Errno::EPIPE if @remote_closed
+
len = msg.bytesize
- write_data(msg) or sync(:out) or raise(IOError)
+ # write(X) on an open socket, where the remote end closes during the write, raises Errno::EPIPE
+ # send(X,0) on an open socket, where the remote end closes during the write, raises Errno::EPIPE
+ write_data(msg) or sync(:out) or raise(Errno::EPIPE)
len
end
- alias_method :write, :send
+
+ def write(msg)
+ send(msg,0)
+ end
- def read(num_bytes = 16*1024, dest = nil)
- read_data(num_bytes, dest) or sync(:in) or raise(IOError)
+ def read(num_bytes = nil, dest = nil)
+ handle_read(:read, num_bytes, dest)
end
alias_method :read_nonblock, :read
- alias_method :recv, :read
+ def recv(num_bytes, flags = 0)
+ raise "Unknown flags in recv(): #{flags}" if flags.nonzero?
+ handle_read(:recv, num_bytes)
+ end
+
def close
+ # close on a closed socket raises IOError with a message of "closed stream"
+ raise IOError, "closed stream" if @closed
+ @closed = true
close_connection true
@in_req = @out_req = nil
+ # close on an open socket returns nil
+ nil
end
# EventMachine interface
def connection_completed
+ @opening = false
@in_req.succeed self
end
-
- def unbind
- @in_req.fail nil if @in_req
+
+ attr_reader :unbind_reason
+
+ # Can't set a default value for reason (e.g. reason=nil), as in that case
+ # EM fails to pass in the reason argument and you'll always get the default
+ # value.
+ def unbind(reason)
+ @unbind_reason = reason
+ @remote_closed = true unless @closed
+ if @opening
+ @in_req.fail nil if @in_req
+ else
+ @in_req.succeed read_data if @in_req
+ end
@out_req.fail nil if @out_req
- close
+ @in_req = @out_req = nil
end
def receive_data(data)
@in_buff << data
if @in_req && (data = read_data)
- @in_req.succeed data
+ @in_req.succeed data unless data == :block
end
end
protected
- def read_data(num_bytes = nil, dest = nil)
- @read_bytes = num_bytes if num_bytes
- @read_dest = dest if dest
- if @in_buff.size > 0
- data = @in_buff.slice!(0, @read_bytes)
- @read_bytes = 0
+ def handle_read(type, num_bytes, dest=nil)
+ # read(-n) always raises ArgumentError
+ # recv(-n) always raises ArgumentError
+ raise ArgumentError, "negative length #{num_bytes} given" if num_bytes != nil and num_bytes < 0
+ # read(n) on a socket after #close, raises IOError with message "closed stream"
+ # read(0) on a socket after #close, raises IOError with message "closed stream"
+ # read() on a socket after #close, raises IOError with message "closed stream"
+ # recv(n) on a socket after #close, raises IOError with message "closed stream"
+ # recv(0) on a socket after #close, raises IOError with message "closed stream"
+ raise IOError, "closed stream" if @closed
+ # read(0) on an open socket, return ""
+ # read(0) on a remotely closed socket, with buffered data, returns ""
+ # read(0) on a remotely closed socket, with no buffered data, returns ""
+ # recv(0) on an open socket, return ""
+ # recv(0) on a remotely closed socket, with buffered data, returns ""
+ # recv(0) on a remotely closed socket, with no buffered data, returns ""
+ return "" if num_bytes == 0
- if @read_dest
- @read_dest.replace data
- @read_dest = nil
+ @read_type = type
+ @read_bytes = num_bytes
+ @read_dest = dest if dest
+
+ (data = read_data) != :block ? data : sync(:in)
+ end
+
+ def try_read_data
+ if @read_type == :read
+ unless @remote_closed
+ if @read_bytes
+ # read(n) on an open socket, with >= than n buffered data, returns n data
+ if @in_buff.size >= @read_bytes then @in_buff.slice!(0, @read_bytes)
+ # read(n) on an open socket, with < than n buffered data, blocks
+ else :block end
+ else
+ # read() on an open socket, blocks until a remote close and returns all the data sent
+ :block
+ end
+ else
+ if @read_bytes
+ # read(n) on a remotely closed socket, with no buffered data, returns nil
+ if @in_buff.empty? then nil
+ # read(n) on a remotely closed socket, with buffered data, returns the buffered data up to n
+ else @in_buff.slice!(0, @read_bytes) end
+ else
+ # read() on a remotely closed socket, with no buffered data, returns ""
+ if @in_buff.empty? then ""
+ # read() on a remotely closed socket, with buffered data, returns the buffered data
+ else @in_buff.slice!(0, @in_buff.size) end
+ end
end
- data
- else
- nil
+ else #recv
+ unless @remote_closed
+ # recv(n) on an open socket, with no buffered data, blocks
+ if @in_buff.empty? then :block
+ # recv(n) on an open socket, with < than n buffered data, return the buffered data
+ # recv(n) on an open socket, with >= than n buffered data, returns n data
+ else @in_buff.slice!(0, @read_bytes) end
+ else
+ # recv(n) on a remotely closed socket, with no buffered data, returns ""
+ if @in_buff.empty? then ""
+ # recv(n) on a remotely closed socket, with < than n buffered data, return the buffered data
+ # recv(n) on a remotely closed socket, with >= than n buffered data, returns n data
+ else @in_buff.slice!(0, @read_bytes) end
+ end
end
end
-
+
+ def read_data
+ data = try_read_data
+ unless data == :block
+ @read_bytes = 0
+ # read(n,buffer) returns the buffer when it does not return nil or raise an exception
+ data = @read_dest.replace(data) if @read_dest and not data.nil?
+ @read_dest = nil
+ end
+ data
+ end
+
def write_data(data = nil)
@out_buff += data if data
loop do
if @out_buff.empty?
@out_req.succeed true if @out_req
return true
end
if self.get_outbound_data_size > EventMachine::FileStreamer::BackpressureLevel
+ # write(X) on an open socket, where the remote end is not reading, > than some buffer size, blocks
+ # send(X,0) on an open socket, where the remote end is not reading, > than some buffer size, blocks
+ # where that buffer size is EventMachine::FileStreamer::BackpressureLevel, returning false will
+ # cause write/send to block
EventMachine::next_tick { write_data }
return false
else
+ # write(X) on an open socket, where the remote end is not reading, <= than some buffer size, sends and returns
+ # send(X,0) on an open socket, where the remote end is not reading, <= than some buffer size, sends returns
len = [@out_buff.bytesize, EventMachine::FileStreamer::ChunkSize].min
self.send_data @out_buff.slice!( 0, len )
end
end
end