/** * $Id$ * * Author:: Francis Cianfrocca (gmail: blackhedd) * Homepage:: http://rubyeventmachine.com * Date:: 15 Jul 2007 * * See EventMachine and EventMachine::Connection for documentation and * usage examples. * * *---------------------------------------------------------------------------- * * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. * Gmail: blackhedd * * This program is free software; you can redistribute it and/or modify * it under the terms of either: 1) the GNU General Public License * as published by the Free Software Foundation; either version 2 of the * License, or (at your option) any later version; or 2) Ruby's License. * * See the file COPYING for complete licensing information. * *--------------------------------------------------------------------------- * * */ /** * */ package com.rubyeventmachine; /** * @author francis * */ import java.nio.channels.*; import java.nio.*; import java.util.*; import java.io.*; import java.net.Socket; import javax.net.ssl.*; import javax.net.ssl.SSLEngineResult.*; import java.lang.reflect.Field; import java.security.*; public class EventableSocketChannel implements EventableChannel { Selector selector; SelectionKey channelKey; SocketChannel channel; long binding; LinkedList outboundQ; long outboundS; boolean bCloseScheduled; boolean bConnectPending; boolean bWatchOnly; boolean bAttached; boolean bNotifyReadable; boolean bNotifyWritable; boolean bPaused; SSLEngine sslEngine; SSLContext sslContext; public EventableSocketChannel (SocketChannel sc, long _binding, Selector sel) { channel = sc; binding = _binding; selector = sel; bCloseScheduled = false; bConnectPending = false; bWatchOnly = false; bAttached = false; bNotifyReadable = false; bNotifyWritable = false; outboundQ = new LinkedList(); outboundS = 0; } public long getBinding() { return binding; } public SocketChannel getChannel() { return channel; } public void register() throws ClosedChannelException { if (channelKey == null) { int events = currentEvents(); channelKey = channel.register(selector, events, this); } } /** * Terminate with extreme prejudice. Don't assume there will be another pass through * the reactor core. */ public void close() { if (channelKey != null) { channelKey.cancel(); channelKey = null; } if (bAttached) { // attached channels are copies, so reset the file descriptor to prevent java from close()ing it Field f; FileDescriptor fd; try { /* do _NOT_ clobber fdVal here, it will break epoll/kqueue on jdk6! * channelKey.cancel() above does not occur until the next call to select * and if fdVal is gone, we will continue to get events for this fd. * * instead, remove fdVal in cleanup(), which is processed via DetachedConnections, * after UnboundConnections but before NewConnections. */ f = channel.getClass().getDeclaredField("fd"); f.setAccessible(true); fd = (FileDescriptor) f.get(channel); f = fd.getClass().getDeclaredField("fd"); f.setAccessible(true); f.set(fd, -1); } catch (java.lang.NoSuchFieldException e) { e.printStackTrace(); } catch (java.lang.IllegalAccessException e) { e.printStackTrace(); } return; } try { channel.close(); } catch (IOException e) { } } public void cleanup() { if (bAttached) { Field f; try { f = channel.getClass().getDeclaredField("fdVal"); f.setAccessible(true); f.set(channel, -1); } catch (java.lang.NoSuchFieldException e) { e.printStackTrace(); } catch (java.lang.IllegalAccessException e) { e.printStackTrace(); } } channel = null; } public void scheduleOutboundData (ByteBuffer bb) { if (!bCloseScheduled && bb.remaining() > 0) { if (sslEngine != null) { try { ByteBuffer b = ByteBuffer.allocate(32*1024); // TODO, preallocate this buffer. sslEngine.wrap(bb, b); b.flip(); outboundQ.addLast(b); outboundS += b.remaining(); } catch (SSLException e) { throw new RuntimeException ("ssl error"); } } else { outboundQ.addLast(bb); outboundS += bb.remaining(); } updateEvents(); } } public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort) { throw new RuntimeException ("datagram sends not supported on this channel"); } /** * Called by the reactor when we have selected readable. */ public void readInboundData (ByteBuffer bb) throws IOException { if (channel.read(bb) == -1) throw new IOException ("eof"); } public long getOutboundDataSize() { return outboundS; } /** * Called by the reactor when we have selected writable. * Return false to indicate an error that should cause the connection to close. * TODO, VERY IMPORTANT: we're here because we selected writable, but it's always * possible to become unwritable between the poll and when we get here. The way * this code is written, we're depending on a nonblocking write NOT TO CONSUME * the whole outbound buffer in this case, rather than firing an exception. * We should somehow verify that this is indeed Java's defined behavior. * @return */ public boolean writeOutboundData() throws IOException { ByteBuffer[] bufs = new ByteBuffer[64]; int i; long written, toWrite; while (!outboundQ.isEmpty()) { i = 0; toWrite = 0; written = 0; while (i < 64 && !outboundQ.isEmpty()) { bufs[i] = outboundQ.removeFirst(); toWrite += bufs[i].remaining(); i++; } if (toWrite > 0) written = channel.write(bufs, 0, i); outboundS -= written; // Did we consume the whole outbound buffer? If yes, // pop it off and keep looping. If no, the outbound network // buffers are full, so break out of here. if (written < toWrite) { while (i > 0 && bufs[i-1].remaining() > 0) { outboundQ.addFirst(bufs[i-1]); i--; } break; } } if (outboundQ.isEmpty() && !bCloseScheduled) { updateEvents(); } // ALWAYS drain the outbound queue before triggering a connection close. // If anyone wants to close immediately, they're responsible for clearing // the outbound queue. return (bCloseScheduled && outboundQ.isEmpty()) ? false : true; } public void setConnectPending() { bConnectPending = true; updateEvents(); } /** * Called by the reactor when we have selected connectable. * Return false to indicate an error that should cause the connection to close. */ public boolean finishConnecting() throws IOException { channel.finishConnect(); bConnectPending = false; updateEvents(); return true; } public boolean scheduleClose (boolean afterWriting) { // TODO: What the hell happens here if bConnectPending is set? if (!afterWriting) { outboundQ.clear(); outboundS = 0; } if (outboundQ.isEmpty()) return true; else { updateEvents(); bCloseScheduled = true; return false; } } public void startTls() { if (sslEngine == null) { try { sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, null, null); // TODO, fill in the parameters. sslEngine = sslContext.createSSLEngine(); // TODO, should use the parameterized version, to get Kerb stuff and session re-use. sslEngine.setUseClientMode(false); } catch (NoSuchAlgorithmException e) { throw new RuntimeException ("unable to start TLS"); // TODO, get rid of this. } catch (KeyManagementException e) { throw new RuntimeException ("unable to start TLS"); // TODO, get rid of this. } } System.out.println ("Starting TLS"); } public ByteBuffer dispatchInboundData (ByteBuffer bb) throws SSLException { if (sslEngine != null) { if (true) throw new RuntimeException ("TLS currently unimplemented"); System.setProperty("javax.net.debug", "all"); ByteBuffer w = ByteBuffer.allocate(32*1024); // TODO, WRONG, preallocate this buffer. SSLEngineResult res = sslEngine.unwrap(bb, w); if (res.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { Runnable r; while ((r = sslEngine.getDelegatedTask()) != null) { r.run(); } } System.out.println (bb); w.flip(); return w; } else return bb; } public void setCommInactivityTimeout (long seconds) { // TODO System.out.println ("SOCKET: SET COMM INACTIVITY UNIMPLEMENTED " + seconds); } public Object[] getPeerName () { Socket sock = channel.socket(); return new Object[]{ sock.getPort(), sock.getInetAddress().getHostAddress() }; } public Object[] getSockName () { Socket sock = channel.socket(); return new Object[]{ sock.getLocalPort(), sock.getLocalAddress().getHostAddress() }; } public void setWatchOnly() { bWatchOnly = true; updateEvents(); } public boolean isWatchOnly() { return bWatchOnly; } public void setAttached() { bAttached = true; } public boolean isAttached() { return bAttached; } public void setNotifyReadable (boolean mode) { bNotifyReadable = mode; updateEvents(); } public boolean isNotifyReadable() { return bNotifyReadable; } public void setNotifyWritable (boolean mode) { bNotifyWritable = mode; updateEvents(); } public boolean isNotifyWritable() { return bNotifyWritable; } public boolean pause() { if (bWatchOnly) { throw new RuntimeException ("cannot pause/resume 'watch only' connections, set notify readable/writable instead"); } boolean old = bPaused; bPaused = true; updateEvents(); return !old; } public boolean resume() { if (bWatchOnly) { throw new RuntimeException ("cannot pause/resume 'watch only' connections, set notify readable/writable instead"); } boolean old = bPaused; bPaused = false; updateEvents(); return old; } public boolean isPaused() { return bPaused; } private void updateEvents() { if (channelKey == null) return; int events = currentEvents(); if (channelKey.interestOps() != events) { channelKey.interestOps(events); } } private int currentEvents() { int events = 0; if (bWatchOnly) { if (bNotifyReadable) events |= SelectionKey.OP_READ; if (bNotifyWritable) events |= SelectionKey.OP_WRITE; } else if (!bPaused) { if (bConnectPending) events |= SelectionKey.OP_CONNECT; else { events |= SelectionKey.OP_READ; if (!outboundQ.isEmpty()) events |= SelectionKey.OP_WRITE; } } return events; } }