package org.cometd.oort; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.cometd.Channel; import org.cometd.Client; import org.cometd.Message; import org.cometd.MessageListener; import org.mortbay.cometd.MessageImpl; import org.mortbay.cometd.client.BayeuxClient; import org.mortbay.log.Log; /** * Oort Comet client. *

* A BayeuxClient that connects the local Oort comet server to * a remote Oort comet server. * */ public class OortComet extends BayeuxClient { protected Oort _oort; protected String _cometUrl; protected String _cometSecret; protected boolean _connected; protected boolean _handshook; OortComet(Oort oort,String cometUrl) { super(oort._httpClient,cometUrl,oort._timer); _cometUrl=cometUrl; _oort=oort; addListener(new OortCometListener()); } public boolean isConnected() { return _connected; } public boolean isHandshook() { return _handshook; } @Override protected String extendOut(String message) { if (message==BayeuxClient.Handshake.__HANDSHAKE) { try { Message[] msg = _msgPool.parse(message); Map oort = new HashMap(); oort.put("oort",_oort.getURL()); oort.put("oortSecret",_oort.getSecret()); oort.put("comet",_cometUrl); Map ext = msg[0].getExt(true); ext.put("oort",oort); super.extendOut(msg[0]); message= _msgPool.getJSON().toJSON(msg); for (Message m:msg) if (m instanceof MessageImpl) ((MessageImpl)m).decRef(); } catch (IOException e) { throw new IllegalArgumentException(e); } } else message=super.extendOut(message); System.err.println(_oort.getURL()+" ==> "+message); return message; } @Override protected void metaConnect(boolean success, Message message) { _connected=success; super.metaConnect(success,message); } @Override protected void metaHandshake(boolean success, boolean reestablish, Message message) { synchronized (_oort) { _handshook=success; super.metaHandshake(success,reestablish,message); if (success) { Map ext = (Map)message.get("ext"); if (ext!=null) { Map oort = (Map)ext.get("oort"); if (oort!=null) { _cometSecret=(String)oort.get("cometSecret"); startBatch(); subscribe("/oort/cloud"); for (String channel : _oort._channels) subscribe(channel); publish("/oort/cloud",_oort.getKnownComets(),_cometSecret); endBatch(); } } System.err.println(_oort.getURL()+" <== "+ext); } } } @Override protected void metaPublishFail(Throwable e, Message[] messages) { // TODO Auto-generated method stub super.metaPublishFail(e,messages); } protected class OortCometListener implements MessageListener { public void deliver(Client fromClient, Client toClient, Message msg) { String channelId = msg.getChannel(); if (msg.getData()!=null) { if (channelId.startsWith("/oort/")) { if (channelId.equals("/oort/cloud")) { Object[] data = (Object[])msg.getData(); Set comets = new HashSet(); for (Object o:data) comets.add(o.toString()); _oort.observedComets(comets); } synchronized (_oort) { for( MessageListener listener : _oort._oortMessageListeners) notifyMessageListener(listener, fromClient, toClient, msg); } } else { Channel channel = _oort._bayeux.getChannel(msg.getChannel(),false); if (channel!=null) channel.publish(_oort._oortClient,msg.getData(),msg.getId()); } } } } private void notifyMessageListener(MessageListener listener, Client from, Client to, Message message) { try { listener.deliver(from, to, message); } catch (Throwable x) { Log.debug(x); } } }