lib/zmachine/tcp_channel.rb in zmachine-0.1.3 vs lib/zmachine/tcp_channel.rb in zmachine-0.2.0
- old
+ new
@@ -1,181 +1,119 @@
+java_import java.io.IOException
+java_import java.net.InetSocketAddress
+java_import java.nio.channels.SocketChannel
java_import java.nio.channels.ServerSocketChannel
require 'zmachine/channel'
module ZMachine
class TCPChannel < Channel
- attr_reader :connect_pending
-
- def initialize(selector)
- super(selector)
- @close_scheduled = false
- @connect_pending = false
- @server_socket = false
+ def selectable_fd
+ @socket
end
- def register
- @channel_key ||= @socket.register(@selector, current_events, self)
- end
-
def bind(address, port)
- @server_socket = true
address = InetSocketAddress.new(address, port)
@socket = ServerSocketChannel.open
@socket.configure_blocking(false)
@socket.bind(address)
end
+ def bound?
+ @socket.is_a?(ServerSocketChannel) and @socket.bound?
+ end
+
def accept
- client_socket = socket.accept
+ client_socket = @socket.accept
return unless client_socket
client_socket.configure_blocking(false)
- channel = TCPChannel.new(@selector)
- channel.socket = client_socket
- channel
+ TCPChannel.new.tap do |channel|
+ channel.socket = client_socket
+ end
end
def connect(address, port)
address = InetSocketAddress.new(address, port)
@socket = SocketChannel.open
@socket.configure_blocking(false)
-
if socket.connect(address)
# Connection returned immediately. Can happen with localhost
# connections.
# WARNING, this code is untested due to lack of available test
# conditions. Ought to be be able to come here from a localhost
# connection, but that doesn't happen on Linux. (Maybe on FreeBSD?)
- # The reason for not handling this until we can test it is that we
- # really need to return from this function WITHOUT triggering any EM
- # events. That's because until the user code has seen the signature
- # we generated here, it won't be able to properly dispatch them. The
- # C++ EM deals with this by setting pending mode as a flag in ALL
- # eventable descriptors and making the descriptor select for
- # writable. Then, it can send UNBOUND and CONNECTION_COMPLETED on the
- # next pass through the loop, because writable will fire.
raise RuntimeError.new("immediate-connect unimplemented")
end
end
- def close_connection(flush = true)
- @reactor.unbind_channel(self) if schedule_close(flush)
+ def connection_pending?
+ @socket.connection_pending?
end
- def close
- if @channel_key
- @channel_key.cancel
- @channel_key = nil
- end
-
- @socket.close rescue nil
+ def finish_connecting
+ return unless connection_pending?
+ @socket.finish_connect # XXX: finish_connect might return false
+ return true
end
- def send_data(data)
- return if @close_scheduled
- buffer = ByteBuffer.wrap(data.to_java_bytes)
- if buffer.remaining() > 0
- @outbound_queue << buffer
- update_events
- end
+ def connected?
+ @socket.connected?
end
- def read_inbound_data(buffer)
+ def read_inbound_data
+ buffer = @inbound_buffer
buffer.clear
- raise IOException.new("eof") if @socket.read(buffer) == -1
+ raise IOException.new("EOF") if @socket.read(buffer) == -1
buffer.flip
return if buffer.limit == 0
String.from_java_bytes(buffer.array[buffer.position...buffer.limit])
end
+ def send_data(data)
+ raise RuntimeError.new("send_data called after close") if @close_scheduled
+ return unless data
+ data = data.to_java_bytes if data.is_a?(String) # EM compat
+ buffer = ByteBuffer.wrap(data)
+ if buffer.has_remaining
+ @outbound_queue << buffer
+ end
+ end
+
def write_outbound_data
- until @outbound_queue.empty?
+ while can_send?
buffer = @outbound_queue.first
- @socket.write(buffer) if buffer.remaining > 0
+ @socket.write(buffer) if buffer.has_remaining
# Did we consume the whole outbound buffer? If yes,
# pop it off and keep looping. If no, the outbound network
# buffers are full, so break out of here.
if buffer.remaining == 0
@outbound_queue.shift
else
break
end
end
- if @outbound_queue.empty? && !@close_scheduled
- update_events
+ if can_send?
+ # network buffers are full
+ return false
end
- return (@close_scheduled && @outbound_queue.empty?) ? false : true
- end
-
- def finish_connecting
- @socket.finish_connect
- @connect_pending = false
- update_events
+ close if @close_scheduled
return true
end
- def schedule_close(after_writing)
- @outbound_queue.clear unless after_writing
-
- if @outbound_queue.empty?
- return true
- else
- update_events
- @close_scheduled = true
- return false
- end
+ def close(after_writing = false)
+ super
+ @socket.close unless can_send?
end
- # TODO: fix these
- def peer_name
- sock = @socket.socket
- [sock.port, sock.inet_address.host_address]
+ def closed?
+ @socket.socket.closed?
end
- def sock_name
- sock = @socket.socket
- [sock.local_port, sock.local_address.host_address]
+ def peer
+ [@socket.socket.port, @socket.socket.inet_address.host_address]
end
- def connect_pending=(value)
- @connect_pending = value
- update_events
- end
-
- def update_events
- return unless @channel_key
- events = current_events
- if @channel_key.interest_ops != events
- @channel_key.interest_ops(events)
- end
- end
-
- # these two are a bit misleading .. only used for the zmq channel
- def has_more?
- false
- end
-
- def can_send?
- false
- end
-
- def current_events
- if @socket.is_a?(ServerSocketChannel)
- return SelectionKey::OP_ACCEPT
- end
-
- events = 0
-
- if @connect_pending
- events |= SelectionKey::OP_CONNECT
- else
- events |= SelectionKey::OP_READ
- events |= SelectionKey::OP_WRITE unless @outbound_queue.empty?
- end
-
- return events
- end
end
end