// ======================================================================== // Copyright 2006-20078 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ======================================================================== package org.mortbay.cometd.client; import java.io.IOException; import java.net.URLEncoder; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import javax.servlet.http.Cookie; import org.cometd.Bayeux; import org.cometd.Client; import org.cometd.ClientListener; import org.cometd.Extension; import org.cometd.Message; import org.cometd.MessageListener; import org.cometd.RemoveListener; import org.mortbay.cometd.MessageImpl; import org.mortbay.cometd.MessagePool; import org.mortbay.component.AbstractLifeCycle; import org.mortbay.io.Buffer; import org.mortbay.io.ByteArrayBuffer; import org.mortbay.jetty.HttpHeaders; import org.mortbay.jetty.HttpSchemes; import org.mortbay.jetty.HttpURI; import org.mortbay.jetty.client.Address; import org.mortbay.jetty.client.ContentExchange; import org.mortbay.jetty.client.HttpClient; import org.mortbay.jetty.client.HttpExchange; import org.mortbay.log.Log; import org.mortbay.util.ArrayQueue; import org.mortbay.util.LazyList; import org.mortbay.util.QuotedStringTokenizer; import org.mortbay.util.ajax.JSON; /* ------------------------------------------------------------ */ /** * Bayeux protocol Client. *

* Implements a Bayeux Ajax Push client as part of the cometd project. *

* The HttpClient attributes are used to share a Timer and MessagePool instance * between all Bayeux clients sharing the same HttpClient. * * @see http://cometd.org * @author gregw * */ public class BayeuxClient extends AbstractLifeCycle implements Client { private final static String __TIMER="org.mortbay.cometd.client.Timer"; private final static String __JSON="org.mortbay.cometd.client.JSON"; private final static String __MSGPOOL="org.mortbay.cometd.MessagePool"; protected HttpClient _httpClient; protected MessagePool _msgPool; private ArrayQueue _inQ = new ArrayQueue(); // queue of incoming messages private ArrayQueue _outQ = new ArrayQueue(); // queue of outgoing messages protected Address _cometdAddress; private Exchange _pull; private Exchange _push; private String _path = "/cometd"; private boolean _initialized = false; private boolean _disconnecting = false; private boolean _handshook = false; private String _clientId; private org.cometd.Listener _listener; private List _rListeners; private List _mListeners; private int _batch; private boolean _formEncoded; private Map _cookies = new ConcurrentHashMap(); private Advice _advice; private Timer _timer; private int _backoffInterval = 0; private int _backoffIncrement = 1000; private int _backoffMaxInterval = 60000; private Buffer _scheme; protected Extension[] _extensions; protected JSON _jsonOut; /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, String url) { this(client,url,null); } /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, String url, Timer timer) { HttpURI uri = new HttpURI(url); _httpClient = client; _cometdAddress = new Address(uri.getHost(),uri.getPort()); _path=uri.getPath(); _timer = timer; _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER; } /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, Address address, String path, Timer timer) { _httpClient = client; _cometdAddress = address; _path = path; _timer = timer; } /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, Address address, String uri) { this(client,address,uri,null); } /* ------------------------------------------------------------ */ public void addExtension(Extension ext) { _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class); } /* ------------------------------------------------------------ */ Extension[] getExtensions() { return _extensions; } /* ------------------------------------------------------------ */ /** * If unable to connect/handshake etc, even if following the interval in the * advice, wait for this interval initially, and try again. * * @param interval */ public void setBackOffInterval(int interval) { _backoffInterval = interval; } /* ------------------------------------------------------------ */ /** * @return the backoff interval to wait before retrying an unsuccessful * or failed message */ public int getBackoffInterval() { return _backoffInterval; } /* ------------------------------------------------------------ */ /** * @deprecated We retry an infinite number of times. * use {@link #getBackoffIncrement()} to set limits */ public void setBackoffMaxRetries(int retries) { } /* ------------------------------------------------------------ */ /** * @deprecated */ public int getBackoffMaxRetries() { return -1; } /* ------------------------------------------------------------ */ /** . Each retry will increment by this * intervel, until we reach _backoffMaxInterval * */ public void setBackoffIncrement(int interval) { _backoffIncrement = interval; } /* ------------------------------------------------------------ */ /** * @return the backoff interval used to increase the backoff time when * retrying an unsuccessful or failed message. */ public int getBackoffIncrement() { return _backoffIncrement; } /* ------------------------------------------------------------ */ public void setBackoffMaxInterval(int interval) { _backoffMaxInterval = interval; } public int getBackoffMaxInterval() { return _backoffMaxInterval; } /* ------------------------------------------------------------ */ /* * (non-Javadoc) Returns the clientId * * @see dojox.cometd.Client#getId() */ public String getId() { return _clientId; } /* ------------------------------------------------------------ */ protected void doStart() throws Exception { if (!_httpClient.isStarted()) throw new IllegalStateException("!HttpClient.isStarted()"); synchronized (_httpClient) { if (_jsonOut == null) { _jsonOut = (JSON)_httpClient.getAttribute(__JSON); if (_jsonOut==null) { _jsonOut = new JSON(); _httpClient.setAttribute(__JSON,_jsonOut); } } if (_timer == null) { _timer = (Timer)_httpClient.getAttribute(__TIMER); if (_timer==null) { _timer = new Timer(__TIMER+"@"+hashCode(),true); _httpClient.setAttribute(__TIMER,_timer); } } if (_msgPool == null) { _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL); if (_msgPool==null) { _msgPool = new MessagePool(); _httpClient.setAttribute(__MSGPOOL,_msgPool); } } } _disconnecting=false; _pull=null; _push=null; super.doStart(); synchronized (_outQ) { if (!_initialized && _pull == null) { _pull = new Handshake(); send((Exchange)_pull,false); } } } /* ------------------------------------------------------------ */ protected void doStop() throws Exception { if (!_disconnecting) disconnect(); super.doStop(); } /* ------------------------------------------------------------ */ public boolean isPolling() { synchronized (_outQ) { return isRunning() && (_pull != null); } } /* ------------------------------------------------------------ */ /** * (non-Javadoc) */ public void deliver(Client from, Message message) { if (!isRunning()) throw new IllegalStateException("Not running"); synchronized (_inQ) { if (_mListeners == null) _inQ.add(message); else { for (MessageListener l : _mListeners) notifyMessageListener(l, from, message); } } } private void notifyMessageListener(MessageListener listener, Client from, Message message) { try { listener.deliver(from, this, message); } catch (Throwable x) { Log.debug(x); } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#deliver(dojox.cometd.Client, java.lang.String, * java.lang.Object, java.lang.String) */ public void deliver(Client from, String toChannel, Object data, String id) { if (!isRunning()) throw new IllegalStateException("Not running"); MessageImpl message = _msgPool.newMessage(); message.put(Bayeux.CHANNEL_FIELD,toChannel); message.put(Bayeux.DATA_FIELD,data); if (id != null) message.put(Bayeux.ID_FIELD,id); synchronized (_inQ) { if (_mListeners == null) { message.incRef(); _inQ.add(message); } else { for (MessageListener l : _mListeners) if (l instanceof MessageListener.Synchronous) notifyMessageListener(l, from, message); } } if (_mListeners !=null) for (MessageListener l : _mListeners) if (!(l instanceof MessageListener.Synchronous)) notifyMessageListener(l, from, message); message.decRef(); } /* ------------------------------------------------------------ */ /** * @deprecated */ public org.cometd.Listener getListener() { synchronized (_inQ) { return _listener; } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#hasMessages() */ public boolean hasMessages() { synchronized (_inQ) { return _inQ.size() > 0; } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#isLocal() */ public boolean isLocal() { return false; } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#subscribe(java.lang.String) */ private void publish(MessageImpl msg) { msg.incRef(); synchronized (_outQ) { _outQ.add(msg); if (_batch == 0 && _initialized && _push == null) { _push = new Publish(); try { send(_push); } catch (IOException e) { metaPublishFail(e,((Publish)_push).getOutboundMessages()); } catch (IllegalStateException e) { metaPublishFail(e,((Publish)_push).getOutboundMessages()); } } } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#publish(java.lang.String, java.lang.Object, * java.lang.String) */ public void publish(String toChannel, Object data, String msgId) { if (!isRunning() || _disconnecting) throw new IllegalStateException("Not running"); MessageImpl msg = _msgPool.newMessage(); msg.put(Bayeux.CHANNEL_FIELD,toChannel); msg.put(Bayeux.DATA_FIELD,data); if (msgId != null) msg.put(Bayeux.ID_FIELD,msgId); publish(msg); msg.decRef(); } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#subscribe(java.lang.String) */ public void subscribe(String toChannel) { if (!isRunning() || _disconnecting) throw new IllegalStateException("Not running"); MessageImpl msg = _msgPool.newMessage(); msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE); msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel); publish(msg); msg.decRef(); } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#unsubscribe(java.lang.String) */ public void unsubscribe(String toChannel) { if (!isRunning() || _disconnecting) throw new IllegalStateException("Not running"); MessageImpl msg = _msgPool.newMessage(); msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE); msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel); publish(msg); msg.decRef(); } /* ------------------------------------------------------------ */ /** * Disconnect this client. * @deprecated use {@link #disconnect()} */ public void remove() { disconnect(); } /* ------------------------------------------------------------ */ /** * Disconnect this client. */ public void disconnect() { if (isStopped() || _disconnecting) throw new IllegalStateException("Not running"); MessageImpl msg = _msgPool.newMessage(); msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT); synchronized (_outQ) { _outQ.add(msg); _disconnecting = true; if (_batch == 0 && _initialized && _push == null) { _push = new Publish(); try { send(_push); } catch (IOException e) { Log.warn(e.toString()); Log.debug(e); send(_push,true); } } _initialized = false; } } /* ------------------------------------------------------------ */ /** * @deprecated */ public void setListener(org.cometd.Listener listener) { synchronized (_inQ) { if (_listener != null) removeListener(_listener); _listener = listener; if (_listener != null) addListener(_listener); } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) Removes all available messages from the inbound queue. If a * listener is set then messages are not queued. * * @see dojox.cometd.Client#takeMessages() */ public List takeMessages() { final LinkedList list; synchronized (_inQ) { list = new LinkedList(_inQ); _inQ.clear(); } for (Message m : list) if (m instanceof MessageImpl) ((MessageImpl)m).decRef(); return list; } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#endBatch() */ public void endBatch() { synchronized (_outQ) { if (--_batch <= 0) { _batch = 0; if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0) { _push = new Publish(); try { send(_push); } catch (IOException e) { metaPublishFail(e,((Publish)_push).getOutboundMessages()); } } } } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#startBatch() */ public void startBatch() { if (isStopped()) throw new IllegalStateException("Not running"); synchronized (_outQ) { _batch++; } } /* ------------------------------------------------------------ */ /** * Customize an Exchange. Called when an exchange is about to be sent to * allow Cookies and Credentials to be customized. Default implementation * sets any cookies */ protected void customize(HttpExchange exchange) { StringBuilder builder = null; for (String cookieName : _cookies.keySet()) { if (builder == null) builder = new StringBuilder(); else builder.append("; "); // Expiration is handled by getCookie() Cookie cookie = getCookie(cookieName); if (cookie != null) { builder.append(cookie.getName()); // TODO quotes builder.append("="); builder.append(cookie.getValue()); // TODO quotes } } if (builder != null) exchange.setRequestHeader(HttpHeaders.COOKIE,builder.toString()); if (_scheme!=null) exchange.setScheme(_scheme); } /* ------------------------------------------------------------ */ public void setCookie(Cookie cookie) { long expirationTime = System.currentTimeMillis(); int maxAge = cookie.getMaxAge(); if (maxAge < 0) expirationTime = -1L; else expirationTime += maxAge * 1000; ExpirableCookie expirableCookie = new ExpirableCookie(cookie, expirationTime); _cookies.put(cookie.getName(), expirableCookie); } public Cookie getCookie(String name) { ExpirableCookie cookie = _cookies.get(name); if (cookie != null) { if (cookie.isExpired()) { _cookies.remove(name); cookie = null; } } return cookie == null ? null : cookie.cookie; } /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /** * The base class for all bayeux exchanges. */ protected class Exchange extends ContentExchange { Message[] _responses; int _connectFailures; int _backoff = _backoffInterval; String _json; /* ------------------------------------------------------------ */ Exchange(String info) { setMethod("POST"); setScheme(HttpSchemes.HTTP_BUFFER); setAddress(_cometdAddress); setURI(_path + "/" + info); setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8"); } /* ------------------------------------------------------------ */ public int getBackoff() { return _backoff; } /* ------------------------------------------------------------ */ public void incBackoff() { _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval); } /* ------------------------------------------------------------ */ protected void setMessage(String message) { message=extendOut(message); setJson(message); } /* ------------------------------------------------------------ */ protected void setJson(String json) { try { _json = json; if (_formEncoded) setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8"))); else setRequestContent(new ByteArrayBuffer(_json,"utf-8")); } catch (Exception e) { Log.ignore(e); setRequestContent(new ByteArrayBuffer(_json)); } } /* ------------------------------------------------------------ */ protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException { super.onResponseStatus(version,status,reason); } /* ------------------------------------------------------------ */ protected void onResponseHeader(Buffer name, Buffer value) throws IOException { super.onResponseHeader(name,value); if (!isRunning()) return; if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL) { String cname = null; String cvalue = null; QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false); tok.setSingle(false); if (tok.hasMoreElements()) cname = tok.nextToken(); if (tok.hasMoreElements()) cvalue = tok.nextToken(); Cookie cookie = new Cookie(cname,cvalue); while (tok.hasMoreTokens()) { String token = tok.nextToken(); if ("Version".equalsIgnoreCase(token)) cookie.setVersion(Integer.parseInt(tok.nextToken())); else if ("Comment".equalsIgnoreCase(token)) cookie.setComment(tok.nextToken()); else if ("Path".equalsIgnoreCase(token)) cookie.setPath(tok.nextToken()); else if ("Domain".equalsIgnoreCase(token)) cookie.setDomain(tok.nextToken()); else if ("Expires".equalsIgnoreCase(token)) { try { Date date = new SimpleDateFormat("EEE, dd-MMM-yy HH:mm:ss 'GMT'").parse(tok.nextToken()); Long maxAge = TimeUnit.MILLISECONDS.toSeconds(date.getTime() - System.currentTimeMillis()); cookie.setMaxAge(maxAge > 0 ? maxAge.intValue() : 0); } catch (ParseException ignored) { } } else if ("Max-Age".equalsIgnoreCase(token)) { try { int maxAge = Integer.parseInt(tok.nextToken()); cookie.setMaxAge(maxAge); } catch (NumberFormatException ignored) { } } else if ("Secure".equalsIgnoreCase(token)) cookie.setSecure(true); } BayeuxClient.this.setCookie(cookie); } } /* ------------------------------------------------------------ */ protected void onResponseComplete() throws IOException { if (!isRunning()) return; super.onResponseComplete(); if (getResponseStatus() == 200) { String content = getResponseContent(); // TODO if (content == null || content.length() == 0) throw new IllegalStateException(); _responses = _msgPool.parse(content); if (_responses!=null) for (int i=0;i<_responses.length;i++) extendIn(_responses[i]); } } /* ------------------------------------------------------------ */ protected void resend(boolean backoff) { if (!isRunning()) return; final boolean disconnecting; synchronized (_outQ) { disconnecting=_disconnecting; } if (disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} return; } setJson(_json); if (!send(this,backoff)) Log.warn("Retries exhausted"); // giving up } /* ------------------------------------------------------------ */ protected void recycle() { if (_responses!=null) for (Message msg:_responses) if (msg instanceof MessageImpl) ((MessageImpl)msg).decRef(); _responses=null; } } /* ------------------------------------------------------------ */ /** * The Bayeux handshake exchange. Negotiates a client Id and initializes the * protocol. * */ protected class Handshake extends Exchange { public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]"; Handshake() { super("handshake"); setMessage(__HANDSHAKE); } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete() */ protected void onResponseComplete() throws IOException { super.onResponseComplete(); if (!isRunning()) return; if (_disconnecting) { Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure","expired"); metaHandshake(false,false,error); try{stop();}catch(Exception e){Log.ignore(e);} return; } if (getResponseStatus() == 200 && _responses != null && _responses.length > 0) { MessageImpl response = (MessageImpl)_responses[0]; boolean successful = response.isSuccessful(); // Get advice if there is any Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD); if (adviceField != null) _advice = new Advice(adviceField); if (successful) { _handshook = true; if (Log.isDebugEnabled()) Log.debug("Successful handshake, sending connect"); _clientId = (String)response.get(Bayeux.CLIENT_FIELD); metaHandshake(true,_handshook,response); _pull = new Connect(); send(_pull,false); } else { metaHandshake(false,false,response); _handshook = false; if (_advice != null && _advice.isReconnectNone()) throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]); else if (_advice != null && _advice.isReconnectHandshake()) { _pull = new Handshake(); if (!send(_pull,true)) throw new IOException("Handshake, retries exhausted"); } else // assume retry = reconnect? { _pull = new Connect(); if (!send(_pull,true)) throw new IOException("Connect after handshake, retries exhausted"); } } } else { Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("status",new Integer(getResponseStatus())); error.put("content",getResponseContent()); metaHandshake(false,false,error); resend(true); } recycle(); } /* ------------------------------------------------------------ */ protected void onExpire() { // super.onExpire(); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure","expired"); metaHandshake(false,false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onConnectionFailed(Throwable ex) { // super.onConnectionFailed(ex); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); ex.printStackTrace(); metaHandshake(false,false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onException(Throwable ex) { // super.onException(ex); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); metaHandshake(false,false,error); resend(true); } } /* ------------------------------------------------------------ */ /** * The Bayeux Connect exchange. Connect exchanges implement the long poll * for Bayeux. */ protected class Connect extends Exchange { String _connectString; Connect() { super("connect"); _connectString = "{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}"; setMessage(_connectString); } protected void onResponseComplete() throws IOException { super.onResponseComplete(); if (!isRunning()) return; if (getResponseStatus() == 200 && _responses != null && _responses.length > 0) { try { startBatch(); for (int i = 0; i < _responses.length; i++) { Message msg = _responses[i]; // get advice if there is any Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD); if (adviceField != null) _advice = new Advice(adviceField); if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD))) { Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD); if (successful != null && successful.booleanValue()) { metaConnect(true,msg); if (!isRunning()) break; synchronized (_outQ) { if (_disconnecting) continue; if (!isInitialized()) { setInitialized(true); { if (_outQ.size() > 0) { _push = new Publish(); send(_push); } } } } // send a Connect (ie longpoll) possibly with // delay according to interval advice _pull = new Connect(); send(_pull,false); } else { // received a failure to our connect message, // check the advice to see what to do: // reconnect: none = hard error // reconnect: handshake = send a handshake // message // reconnect: retry = send another connect, // possibly using interval setInitialized(false); metaConnect(false,msg); synchronized(_outQ) { if (!isRunning()||_disconnecting) break; } if (_advice != null && _advice.isReconnectNone()) throw new IOException("Connect failed, advice reconnect=none"); else if (_advice != null && _advice.isReconnectHandshake()) { if (Log.isDebugEnabled()) Log.debug("connect received success=false, advice is to rehandshake"); _pull = new Handshake(); send(_pull,true); } else { // assume retry = reconnect if (Log.isDebugEnabled()) Log.debug("Assuming retry=reconnect"); resend(true); } } } deliver(null,msg); } } finally { endBatch(); } } else { Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("status",getResponseStatus()); error.put("content",getResponseContent()); metaConnect(false,error); resend(true); } recycle(); } /* ------------------------------------------------------------ */ protected void onExpire() { // super.onExpire(); setInitialized(false); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure","expired"); metaConnect(false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onConnectionFailed(Throwable ex) { // super.onConnectionFailed(ex); setInitialized(false); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); metaConnect(false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onException(Throwable ex) { // super.onException(ex); setInitialized(false); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); metaConnect(false,error); resend(true); } } /* ------------------------------------------------------------ */ /** * Publish message exchange. Sends messages to bayeux server and handles any * messages received as a result. */ protected class Publish extends Exchange { Publish() { super("publish"); StringBuffer json = new StringBuffer(256); synchronized (json) { synchronized (_outQ) { int s=_outQ.size(); if (s == 0) return; for (int i=0;i 0) { for (int i = 0; i < _responses.length; i++) { MessageImpl msg = (MessageImpl)_responses[i]; deliver(null,msg); if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful()) { if (isStarted()) { try{stop();}catch(Exception e){Log.ignore(e);} } break; } } } else { Log.warn("Publish, error=" + getResponseStatus()); } } finally { endBatch(); } recycle(); } /* ------------------------------------------------------------ */ protected void onExpire() { super.onExpire(); metaPublishFail(null,this.getOutboundMessages()); if (_disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} } } /* ------------------------------------------------------------ */ protected void onConnectionFailed(Throwable ex) { super.onConnectionFailed(ex); metaPublishFail(ex,this.getOutboundMessages()); if (_disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} } } /* ------------------------------------------------------------ */ protected void onException(Throwable ex) { super.onException(ex); metaPublishFail(ex,this.getOutboundMessages()); if (_disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} } } } /* ------------------------------------------------------------ */ public void addListener(ClientListener listener) { synchronized (_inQ) { boolean added=false; if (listener instanceof MessageListener) { added=true; if (_mListeners == null) _mListeners = new ArrayList(); _mListeners.add((MessageListener)listener); } if (listener instanceof RemoveListener) { added=true; if (_rListeners == null) _rListeners = new ArrayList(); _rListeners.add((RemoveListener)listener); } if (!added) throw new IllegalArgumentException(); } } /* ------------------------------------------------------------ */ public void removeListener(ClientListener listener) { synchronized (_inQ) { if (listener instanceof MessageListener) { if (_mListeners != null) _mListeners.remove((MessageListener)listener); } if (listener instanceof RemoveListener) { if (_rListeners != null) _rListeners.remove((RemoveListener)listener); } } } /* ------------------------------------------------------------ */ public int getMaxQueue() { return -1; } /* ------------------------------------------------------------ */ public Queue getQueue() { return _inQ; } /* ------------------------------------------------------------ */ public void setMaxQueue(int max) { if (max != -1) throw new UnsupportedOperationException(); } /* ------------------------------------------------------------ */ /** * Send the exchange, possibly using a backoff. * * @param exchange * @param backoff * if true, use backoff algorithm to send * @return */ protected boolean send(final Exchange exchange, final boolean backoff) { long interval = (_advice != null?_advice.getInterval():0); if (backoff) { int backoffInterval = exchange.getBackoff(); if (Log.isDebugEnabled()) Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange); exchange.incBackoff(); interval += backoffInterval; } if (interval > 0) { TimerTask task = new TimerTask() { public void run() { try { send(exchange); } catch (IOException e) { Log.warn("Delayed send, retry: "+e); Log.debug(e); send(exchange,true); } catch (IllegalStateException e) { Log.warn("Delayed send, retry: "+e); Log.debug(e); send(exchange,true); } } }; if (Log.isDebugEnabled()) Log.debug("Delay " + interval + " send of " + exchange); _timer.schedule(task,interval); } else { try { send(exchange); } catch (IOException e) { Log.warn("Send, retry on fail: "+e); Log.debug(e); return send(exchange,true); } catch (IllegalStateException e) { Log.warn("Send, retry on fail: "+e); Log.debug(e); return send(exchange,true); } } return true; } /* ------------------------------------------------------------ */ /** * Send the exchange. * * @param exchange * @throws IOException */ protected void send(HttpExchange exchange) throws IOException { exchange.reset(); // ensure at start state customize(exchange); if (Log.isDebugEnabled()) Log.debug("Send: using any connection=" + exchange); _httpClient.send(exchange); // use any connection } /* ------------------------------------------------------------ */ /** * False when we have received a success=false message in response to a * Connect, or we have had an exception when sending or receiving a Connect. * * True when handshake and then connect has happened. * * @param b */ protected void setInitialized(boolean b) { synchronized (_outQ) { _initialized = b; } } /* ------------------------------------------------------------ */ protected boolean isInitialized() { return _initialized; } /* ------------------------------------------------------------ */ /** * Called with the results of a /meta/connect message * @param success connect was returned with this status */ protected void metaConnect(boolean success, Message message) { if (!success) Log.warn(this.toString()+" "+message.toString()); } /* ------------------------------------------------------------ */ /** * Called with the results of a /meta/handshake message * @param success connect was returned with this status * @param reestablish the client was previously connected. */ protected void metaHandshake(boolean success, boolean reestablish, Message message) { if (!success) Log.warn(this.toString()+" "+message.toString()); } /* ------------------------------------------------------------ */ /** * Called with the results of a failed publish */ protected void metaPublishFail(Throwable e, Message[] messages) { Log.warn(this.toString()+": "+e); Log.debug(e); } /* ------------------------------------------------------------ */ /** Called to extend outbound string messages. * Some messages are sent as preformatted JSON strings (eg handshake * and connect messages). This extendOut method is a variation of the * {@link #extendOut(Message)} method to efficiently cater for these * preformatted strings. *

* This method calls the {@link Extension}s added by {@link #addExtension(Extension)} * * @param msg * @return the extended message */ protected String extendOut(String msg) { if (_extensions==null) return msg; try { Message[] messages = _msgPool.parse(msg); for (int i=0; i * This method calls the {@link Extension}s added by {@link #addExtension(Extension)} * */ protected void extendOut(Message message) { if (_extensions!=null) { Message m = message; if (m.getChannel().startsWith(Bayeux.META_SLASH)) for (int i=0;m!=null && i<_extensions.length;i++) m=_extensions[i].sendMeta(this,m); else for (int i=0;m!=null && i<_extensions.length;i++) m=_extensions[i].send(this,m); if (message!=m) { message.clear(); if (m!=null) for (Map.Entry entry:m.entrySet()) message.put(entry.getKey(),entry.getValue()); } } } /* ------------------------------------------------------------ */ /** Called to extend inbound messages *

* This method calls the {@link Extension}s added by {@link #addExtension(Extension)} * */ protected void extendIn(Message message) { if (_extensions!=null) { Message m = message; if (m.getChannel().startsWith(Bayeux.META_SLASH)) for (int i=_extensions.length;m!=null && i-->0;) m=_extensions[i].rcvMeta(this,m); else for (int i=_extensions.length;m!=null && i-->0;) m=_extensions[i].rcv(this,m); if (message!=m) { message.clear(); if (m!=null) for (Map.Entry entry:m.entrySet()) message.put(entry.getKey(),entry.getValue()); } } } private static class ExpirableCookie { private final Cookie cookie; private final long expirationTime; private ExpirableCookie(Cookie cookie, long expirationTime) { this.cookie = cookie; this.expirationTime = expirationTime; } private boolean isExpired() { if (expirationTime < 0) return false; return System.currentTimeMillis() >= expirationTime; } } }