// ======================================================================== // Copyright 2008 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //======================================================================== package org.mortbay.cometd; import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.cometd.Bayeux; import org.cometd.Channel; import org.cometd.Client; import org.cometd.Listener; import org.cometd.Message; import org.cometd.MessageListener; import org.mortbay.component.LifeCycle; import org.mortbay.log.Log; import org.mortbay.thread.QueuedThreadPool; import org.mortbay.thread.ThreadPool; /* ------------------------------------------------------------ */ /** * Abstract Bayeux Service class. This is a base class to assist with the * creation of server side @ link Bayeux} clients that provide services to * remote Bayeux clients. The class provides a Bayeux {@link Client} and * {@link Listener} together with convenience methods to map subscriptions to * methods on the derived class and to send responses to those methods. * *

* If a {@link #set_threadPool(ThreadPool)} is set, then messages are handled in * their own threads. This is desirable if the handling of a message can take * considerable time and it is desired not to hold up the delivering thread * (typically a HTTP request handling thread). * *

* If the BayeuxService is constructed asynchronously (the default), then * messages are delivered unsynchronized and multiple simultaneous calls to * handling methods may occur. * *

* If the BayeuxService is constructed as a synchronous service, then message * delivery is synchronized on the internal {@link Client} instances used and * only a single call will be made to the handler method (unless a thread pool * is used). * * @see MessageListener * @author gregw * */ public abstract class BayeuxService { private String _name; private Bayeux _bayeux; private Client _client; private Map _methods=new ConcurrentHashMap(); private ThreadPool _threadPool; private MessageListener _listener; private boolean _seeOwn=false; /* ------------------------------------------------------------ */ /** * Instantiate the service. Typically the derived constructor will call @ * #subscribe(String, String)} to map subscriptions to methods. * * @param bayeux * The bayeux instance. * @param name * The name of the service (used as client ID prefix). */ public BayeuxService(Bayeux bayeux, String name) { this(bayeux,name,0,false); } /* ------------------------------------------------------------ */ /** * Instantiate the service. Typically the derived constructor will call @ * #subscribe(String, String)} to map subscriptions to methods. * * @param bayeux * The bayeux instance. * @param name * The name of the service (used as client ID prefix). * @param maxThreads * The size of a ThreadPool to create to handle messages. */ public BayeuxService(Bayeux bayeux, String name, int maxThreads) { this(bayeux,name,maxThreads,false); } /* ------------------------------------------------------------ */ /** * Instantiate the service. Typically the derived constructor will call @ * #subscribe(String, String)} to map subscriptions to methods. * * @param bayeux * The bayeux instance. * @param name * The name of the service (used as client ID prefix). * @param maxThreads * The size of a ThreadPool to create to handle messages. * @param synchronous * True if message delivery will be synchronized on the client. */ public BayeuxService(Bayeux bayeux, String name, int maxThreads, boolean synchronous) { if (maxThreads > 0) setThreadPool(new QueuedThreadPool(maxThreads)); _name=name; _bayeux=bayeux; _client=_bayeux.newClient(name); _listener=(synchronous)?new SyncListen():new AsyncListen(); _client.addListener(_listener); } /* ------------------------------------------------------------ */ public Bayeux getBayeux() { return _bayeux; } /* ------------------------------------------------------------ */ public Client getClient() { return _client; } /* ------------------------------------------------------------ */ public ThreadPool getThreadPool() { return _threadPool; } /* ------------------------------------------------------------ */ /** * Set the threadpool. If the {@link ThreadPool} is a {@link LifeCycle}, * then it is started by this method. * * @param pool */ public void setThreadPool(ThreadPool pool) { try { if (pool instanceof LifeCycle) if (!((LifeCycle)pool).isStarted()) ((LifeCycle)pool).start(); } catch(Exception e) { throw new IllegalStateException(e); } _threadPool=pool; } /* ------------------------------------------------------------ */ public boolean isSeeOwnPublishes() { return _seeOwn; } /* ------------------------------------------------------------ */ public void setSeeOwnPublishes(boolean own) { _seeOwn=own; } /* ------------------------------------------------------------ */ /** * Subscribe to a channel. Subscribe to channel and map a method to handle * received messages. The method must have a unique name and one of the * following signatures: *