package org.mortbay.cometd.client; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Timer; import java.util.concurrent.atomic.AtomicInteger; import org.cometd.Bayeux; import org.cometd.Client; import org.cometd.Message; import org.cometd.MessageListener; import org.mortbay.cometd.AbstractBayeux; import org.mortbay.cometd.client.ext.TimesyncClientExtension; import org.mortbay.jetty.client.Address; import org.mortbay.jetty.client.HttpClient; import org.mortbay.thread.QueuedThreadPool; import org.mortbay.util.ajax.JSON; public class BayeuxLoadGenerator { private static final long classLoadTime = System.currentTimeMillis(); SecureRandom _random= new SecureRandom(); HttpClient _httpClient; ArrayList clients=new ArrayList(); long _minLatency; long _maxLatency; long _totalLatency; long _got; AtomicInteger _subscribed = new AtomicInteger(); public BayeuxLoadGenerator() throws Exception { _httpClient=new HttpClient(); _httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); // http.setConnectorType(HttpClient.CONNECTOR_SOCKET); _httpClient.setMaxConnectionsPerAddress(40000); QueuedThreadPool pool = new QueuedThreadPool(); pool.setMaxThreads(500); pool.setDaemon(true); _httpClient.setThreadPool(pool); _httpClient.start(); } public void generateLoad() throws Exception { LineNumberReader in = new LineNumberReader(new InputStreamReader(System.in)); System.err.print("protocol[http]: "); String t = in.readLine().trim(); if (t.length()==0) t="http"; String protocol=t; System.err.print("server[localhost]: "); t = in.readLine().trim(); if (t.length()==0) t="localhost"; String host=t; System.err.print("port[8080]: "); t = in.readLine().trim(); if (t.length()==0) t="8080"; int port = Integer.parseInt(t); System.err.print("context[/cometd]: "); t = in.readLine().trim(); if (t.length()==0) t="/cometd"; String uri=t+"/cometd"; String url = protocol+"://"+host+":"+port+uri; int nclients=100; int size=50; int rooms=100; int rooms_per_client=1; int publish=1000; int pause=100; int burst=10; int maxLatency=5000; System.err.print("base[/chat/demo]: "); t = in.readLine().trim(); if (t.length()==0) t="/chat/demo"; String base = t; System.err.print("rooms ["+rooms+"]: "); t = in.readLine().trim(); if (t.length()==0) t=""+rooms; rooms=Integer.parseInt(t); System.err.print("rooms per client ["+rooms_per_client+"]: "); t = in.readLine().trim(); if (t.length()==0) t=""+rooms_per_client; rooms_per_client=Integer.parseInt(t); System.err.print("max Latency ["+maxLatency+"]: "); t = in.readLine().trim(); if (t.length()==0) t=""+maxLatency; maxLatency=Integer.parseInt(t); while(true) { System.err.println("--"); System.err.print("clients ["+nclients+"]: "); t = in.readLine(); if (t==null) return; t = t.trim(); if (t.length()==0) t=""+nclients; nclients=Integer.parseInt(t); if (nclients classLoadTime) { long latency= System.currentTimeMillis()-l; synchronized(BayeuxLoadGenerator.this) { _got++; if (_maxLatency=100?"0":i>=10?"00":"000")+i); Thread.sleep(150); } client.startBatch(); if (rooms_per_client==1) { int room=u%rooms; client.subscribe(room>0?(base+"/"+room):base); } else { for (int i=0;i0?(base+"/"+room):base); } } client.endBatch(); } while (clients.size()>nclients) { BayeuxClient client=clients.remove(0); client.disconnect(); _subscribed.addAndGet(-rooms_per_client); if (clients.size()%10==0) { int i=clients.size(); System.err.println("clients = "+(i>=1000?"":i>=100?"0":i>=10?"00":"000")+i); Thread.sleep(100); } } Thread.sleep(500); while(_subscribed.get()!=nclients*rooms_per_client) { // System.err.println(destination.toDetailString()); System.err.println("Subscribed:"+_subscribed.get()+" != "+(nclients*rooms_per_client)+" ..."); Thread.sleep(1000); } System.err.println("Clients: "+nclients+" subscribed:"+_subscribed.get()); synchronized(this) { _got=0; _minLatency=0; _maxLatency=0; _totalLatency=0; } System.err.print("publish ["+publish+"]: "); t = in.readLine().trim(); if (t.length()==0) t=""+publish; publish=Integer.parseInt(t); System.err.print("publish size ["+size+"]: "); t = in.readLine().trim(); if (t.length()==0) t=""+size; size=Integer.parseInt(t); String chat=""; for (int i=0;i=1000?"":u>=100?"0":u>=10?"00":"000")+u; Object msg=new JSON.Literal("{\"user\":\""+name+"\",\"chat\":\""+chat+" "+i+"\"}"); c.startBatch(); for (int b=0;b0?(base+"/"+room):base,msg,id); i++; if (i%10==0) { long latency=0; synchronized(this) { if (_got>0) latency=_totalLatency/_got; } if (latency>maxLatency) { System.err.println("\nABORTED!"); break trial; } char dot=(char)('0'+(int)(latency/100)); System.err.print(dot); if (i%1000==0) System.err.println(); } } c.endBatch(); if (pause>0) Thread.sleep(pause); } if (_got>0) Thread.sleep(_totalLatency/_got); else Thread.sleep(100); for (BayeuxClient c : clients) { if (!c.isPolling()) System.err.println("PROBLEM WITH "+c); } System.err.println(); long last=0; int sleep=100; while (_got<(nclients/rooms*rooms_per_client*publish)) { System.err.println("Got:"+_got+" < "+(nclients/rooms*rooms_per_client*publish)+" ..."); Thread.sleep(sleep); if (last!=0 && _got==last) break; last=_got; sleep+=100; } System.err.println("Got:"+_got+" of "+(nclients/rooms*rooms_per_client*publish)); long end=System.currentTimeMillis(); System.err.println("Got "+_got+" at "+(_got*1000/(end-start))+"/s, latency min/ave/max ="+_minLatency+"/"+(_totalLatency/_got)+"/"+_maxLatency+"ms"); } } public static void main(String[] args) throws Exception { BayeuxLoadGenerator gen = new BayeuxLoadGenerator(); gen.generateLoad(); } }