package org.cometd.oort; import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.Timer; import org.cometd.Bayeux; import org.cometd.Client; import org.cometd.Extension; import org.cometd.Message; import org.cometd.MessageListener; import org.cometd.RemoveListener; import org.mortbay.component.AbstractLifeCycle; import org.mortbay.jetty.client.HttpClient; import org.mortbay.log.Log; import org.mortbay.util.ajax.JSON; /* ------------------------------------------------------------ */ /** * Oort cluster of cometd servers. *
* This class maintains a collection of {@link OortComet} instances to each * comet server identified by calls to {@link #observeComet(String)}. The Oort * instance is created and configured by {@link OortServlet}. *
* The key configuration parameter that must be set is the Oort URL, which is * full public URL to the cometd servlet, eg. http://myserver:8080/context/cometd * See {@link OortServlet} for more configuration detail.
* @author gregw
*
*/
public class Oort extends AbstractLifeCycle
{
public final static String OORT_URL = "oort.url";
public final static String OORT_CLOUD = "oort.cloud";
public final static String OORT_CHANNELS = "oort.channels";
public final static String OORT_ATTRIBUTE = "org.cometd.oort.Oort";
protected String _url;
protected String _secret;
protected Bayeux _bayeux;
protected HttpClient _httpClient=new HttpClient();
protected Timer _timer=new Timer();
protected Random _random=new SecureRandom();
protected Client _oortClient;
protected List
* The the comet server is not already observed, start a {@link OortComet}
* instance for it.
*
* @param cometUrl
* @return The {@link OortComet} instance for the comet server.
*/
public OortComet observeComet(String cometUrl)
{
synchronized (this)
{
if (_url.equals(cometUrl))
return null;
OortComet comet = _knownCommets.get(cometUrl);
if (comet==null)
{
try
{
comet = new OortComet(this,cometUrl);
_knownCommets.put(cometUrl,comet);
comet.start();
}
catch(Exception e)
{
throw new IllegalStateException(e);
}
}
return comet;
}
}
/* ------------------------------------------------------------ */
/**
* Pass observed comets.
*
* Called when another comet server publishes it's list of
* known comets to the /oort/cloud channel. If the list contains
* any unknown commets, then {@link #observeComet(String)} is
* called for each.
* @param comets
*/
void observedComets(Set
* Once observed, all {@link OortComet} instances subscribe
* to the channel and will repeat any messages published to
* the local channel (with loop prevention), so that the
* messages are distributed to all Oort comet servers.
* @param channelId
*/
public void observeChannel(String channelId)
{
synchronized (this)
{
if (!_channels.contains(channelId))
{
_channels.add(channelId);
for (OortComet comet : _knownCommets.values())
if (comet.isHandshook())
comet.subscribe(channelId);
}
}
}
/* ------------------------------------------------------------ */
/**
* Add a MessageListener that will receive all messages
* published on /oort/* channels on connected OortComets
* @param listener
*/
public void addOortMessageListener(MessageListener listener)
{
synchronized (this)
{
_oortMessageListeners.add(listener);
}
}
/* ------------------------------------------------------------ */
/**
* Remove an Oort message listener.
* @param listener
* @return true if the listener was removed.
*/
public boolean removeOortClientListener(MessageListener listener)
{
synchronized (this)
{
return _oortMessageListeners.remove(listener);
}
}
/* ------------------------------------------------------------ */
public boolean isOort(Client client)
{
return client==_oortClient;
}
/* ------------------------------------------------------------ */
public String toString()
{
return _url;
}
/* ------------------------------------------------------------ */
/**
* Called to register the details of a successful handshake with an
* Oort comet. A {@link RemoteOortClientListener} instance is added to
* the local Oort client instance.
* @param oortUrl
* @param oortSecret
* @param clientId
*/
protected void oortHandshook(String oortUrl,String oortSecret,String clientId)
{
Log.info(this+": "+clientId+" is oort "+oortUrl);
if (!_knownCommets.containsKey(oortUrl))
observeComet(oortUrl);
Client client = _bayeux.getClient(clientId);
client.addExtension(new RemoteOortClientExtension());
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/**
* Extension to detect incoming handshake from other Oort servers
* and to call {@link Oort#oortHandshook(String, String, String)}.
*
*/
protected class OortExtension implements Extension
{
public Message rcv(Client from, Message message)
{
return message;
}
public Message rcvMeta(Client from, Message message)
{
return message;
}
public Message send(Client from, Message message)
{
return message;
}
public Message sendMeta(Client from, Message message)
{
if (message.getChannel().equals(Bayeux.META_HANDSHAKE) && Boolean.TRUE.equals(message.get(Bayeux.SUCCESSFUL_FIELD)))
{
Message rcv = message.getAssociated();
System.err.println(_url+" --> "+rcv);
Map