lib/zmachine/tcp_channel.rb in zmachine-0.1.3 vs lib/zmachine/tcp_channel.rb in zmachine-0.2.0

- old
+ new

@@ -1,181 +1,119 @@ +java_import java.io.IOException +java_import java.net.InetSocketAddress +java_import java.nio.channels.SocketChannel java_import java.nio.channels.ServerSocketChannel require 'zmachine/channel' module ZMachine class TCPChannel < Channel - attr_reader :connect_pending - - def initialize(selector) - super(selector) - @close_scheduled = false - @connect_pending = false - @server_socket = false + def selectable_fd + @socket end - def register - @channel_key ||= @socket.register(@selector, current_events, self) - end - def bind(address, port) - @server_socket = true address = InetSocketAddress.new(address, port) @socket = ServerSocketChannel.open @socket.configure_blocking(false) @socket.bind(address) end + def bound? + @socket.is_a?(ServerSocketChannel) and @socket.bound? + end + def accept - client_socket = socket.accept + client_socket = @socket.accept return unless client_socket client_socket.configure_blocking(false) - channel = TCPChannel.new(@selector) - channel.socket = client_socket - channel + TCPChannel.new.tap do |channel| + channel.socket = client_socket + end end def connect(address, port) address = InetSocketAddress.new(address, port) @socket = SocketChannel.open @socket.configure_blocking(false) - if socket.connect(address) # Connection returned immediately. Can happen with localhost # connections. # WARNING, this code is untested due to lack of available test # conditions. Ought to be be able to come here from a localhost # connection, but that doesn't happen on Linux. (Maybe on FreeBSD?) - # The reason for not handling this until we can test it is that we - # really need to return from this function WITHOUT triggering any EM - # events. That's because until the user code has seen the signature - # we generated here, it won't be able to properly dispatch them. The - # C++ EM deals with this by setting pending mode as a flag in ALL - # eventable descriptors and making the descriptor select for - # writable. Then, it can send UNBOUND and CONNECTION_COMPLETED on the - # next pass through the loop, because writable will fire. raise RuntimeError.new("immediate-connect unimplemented") end end - def close_connection(flush = true) - @reactor.unbind_channel(self) if schedule_close(flush) + def connection_pending? + @socket.connection_pending? end - def close - if @channel_key - @channel_key.cancel - @channel_key = nil - end - - @socket.close rescue nil + def finish_connecting + return unless connection_pending? + @socket.finish_connect # XXX: finish_connect might return false + return true end - def send_data(data) - return if @close_scheduled - buffer = ByteBuffer.wrap(data.to_java_bytes) - if buffer.remaining() > 0 - @outbound_queue << buffer - update_events - end + def connected? + @socket.connected? end - def read_inbound_data(buffer) + def read_inbound_data + buffer = @inbound_buffer buffer.clear - raise IOException.new("eof") if @socket.read(buffer) == -1 + raise IOException.new("EOF") if @socket.read(buffer) == -1 buffer.flip return if buffer.limit == 0 String.from_java_bytes(buffer.array[buffer.position...buffer.limit]) end + def send_data(data) + raise RuntimeError.new("send_data called after close") if @close_scheduled + return unless data + data = data.to_java_bytes if data.is_a?(String) # EM compat + buffer = ByteBuffer.wrap(data) + if buffer.has_remaining + @outbound_queue << buffer + end + end + def write_outbound_data - until @outbound_queue.empty? + while can_send? buffer = @outbound_queue.first - @socket.write(buffer) if buffer.remaining > 0 + @socket.write(buffer) if buffer.has_remaining # Did we consume the whole outbound buffer? If yes, # pop it off and keep looping. If no, the outbound network # buffers are full, so break out of here. if buffer.remaining == 0 @outbound_queue.shift else break end end - if @outbound_queue.empty? && !@close_scheduled - update_events + if can_send? + # network buffers are full + return false end - return (@close_scheduled && @outbound_queue.empty?) ? false : true - end - - def finish_connecting - @socket.finish_connect - @connect_pending = false - update_events + close if @close_scheduled return true end - def schedule_close(after_writing) - @outbound_queue.clear unless after_writing - - if @outbound_queue.empty? - return true - else - update_events - @close_scheduled = true - return false - end + def close(after_writing = false) + super + @socket.close unless can_send? end - # TODO: fix these - def peer_name - sock = @socket.socket - [sock.port, sock.inet_address.host_address] + def closed? + @socket.socket.closed? end - def sock_name - sock = @socket.socket - [sock.local_port, sock.local_address.host_address] + def peer + [@socket.socket.port, @socket.socket.inet_address.host_address] end - def connect_pending=(value) - @connect_pending = value - update_events - end - - def update_events - return unless @channel_key - events = current_events - if @channel_key.interest_ops != events - @channel_key.interest_ops(events) - end - end - - # these two are a bit misleading .. only used for the zmq channel - def has_more? - false - end - - def can_send? - false - end - - def current_events - if @socket.is_a?(ServerSocketChannel) - return SelectionKey::OP_ACCEPT - end - - events = 0 - - if @connect_pending - events |= SelectionKey::OP_CONNECT - else - events |= SelectionKey::OP_READ - events |= SelectionKey::OP_WRITE unless @outbound_queue.empty? - end - - return events - end end end