java/src/org/jruby/jubilee/Server.java in jubilee-0.5.0 vs java/src/org/jruby/jubilee/Server.java in jubilee-1.0.0.beta1
- old
+ new
@@ -1,149 +1,168 @@
package org.jruby.jubilee;
import org.jruby.*;
import org.jruby.anno.JRubyMethod;
+import org.jruby.jubilee.vertx.JubileeVertx;
import org.jruby.runtime.Block;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
-import org.vertx.java.core.VertxFactory;
import org.vertx.java.core.http.HttpServer;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
public class Server extends RubyObject {
- final private Vertx vertx;
- final private HttpServer httpServer;
- private RackApplication app;
- private boolean running;
- private boolean ssl = false;
- private String keyStorePath;
- private String keyStorePassword;
- private String eventBusPrefix;
- private int numberOfWorkers;
- private int port;
- private String host;
+ private Vertx vertx;
+ private HttpServer httpServer;
+ private RackApplication app;
+ private boolean running = false;
+ private boolean ssl = false;
+ private String keyStorePath;
+ private String keyStorePassword;
+ private String eventBusPrefix;
+ private int numberOfWorkers;
+ private int port;
+ private String host;
+ private int clusterPort;
+ private String clusterHost;
- public static void createServerClass(Ruby runtime) {
- RubyModule mJubilee = runtime.defineModule("Jubilee");
- RubyClass serverClass = mJubilee.defineClassUnder("VertxServer", runtime.getObject(), ALLOCATOR);
- serverClass.defineAnnotatedMethods(Server.class);
- }
+ public static void createServerClass(Ruby runtime) {
+ RubyModule mJubilee = runtime.defineModule("Jubilee");
+ RubyClass serverClass = mJubilee.defineClassUnder("VertxServer", runtime.getObject(), ALLOCATOR);
+ serverClass.defineAnnotatedMethods(Server.class);
+ }
- private static ObjectAllocator ALLOCATOR = new ObjectAllocator() {
- public IRubyObject allocate(Ruby ruby, RubyClass rubyClass) {
- return new Server(ruby, rubyClass);
+ private static ObjectAllocator ALLOCATOR = new ObjectAllocator() {
+ public IRubyObject allocate(Ruby ruby, RubyClass rubyClass) {
+ return new Server(ruby, rubyClass);
+ }
+ };
+
+ public Server(Ruby ruby, RubyClass rubyClass) {
+ super(ruby, rubyClass);
}
- };
- public Server(Ruby ruby, RubyClass rubyClass) {
- super(ruby, rubyClass);
- vertx = VertxFactory.newVertx();
- httpServer = vertx.createHttpServer();
- }
+ /**
+ * Initialize jubilee server, take a rack application and a configuration hash as parameter
+ *
+ * @param context
+ * @param args
+ * @param block
+ * @return
+ */
+ @JRubyMethod(name = "initialize")
+ public IRubyObject initialize(ThreadContext context, IRubyObject app, IRubyObject config, Block block) {
+ Ruby runtime = getRuntime();
+ /* configuration keys */
+ RubyHash options = config.convertToHash();
+ RubySymbol port_k = runtime.newSymbol("Port");
+ RubySymbol host_k = runtime.newSymbol("Host");
+ RubySymbol cluster_port_k = runtime.newSymbol("cluster_port");
+ RubySymbol cluster_host_k = runtime.newSymbol("cluster_host");
+ RubySymbol ssl_k = runtime.newSymbol("ssl");
+ RubySymbol keystore_path_k = runtime.newSymbol("keystore_path");
+ RubySymbol keystore_password_k = runtime.newSymbol("keystore_password");
+ RubySymbol eventbus_prefix_k = runtime.newSymbol("eventbus_prefix");
+ RubySymbol number_of_workers_k = runtime.newSymbol("number_of_workers");
- /**
- * Initialize jubilee server, take a rack application and a configuration hash as parameter
- * @param context
- * @param args
- * @param block
- * @return
- */
- @JRubyMethod(name = "initialize")
- public IRubyObject initialize(ThreadContext context, IRubyObject app, IRubyObject config, Block block) {
- Ruby runtime = getRuntime();
- RubyHash options = config.convertToHash();
- RubySymbol port_k = runtime.newSymbol("Port");
- RubySymbol host_k = runtime.newSymbol("Host");
- RubySymbol ssl_k = runtime.newSymbol("ssl");
- RubySymbol keystore_path_k = runtime.newSymbol("keystore_path");
- RubySymbol keystore_password_k = runtime.newSymbol("keystore_password");
- RubySymbol eventbus_prefix_k = runtime.newSymbol("eventbus_prefix");
- RubySymbol number_of_workers_k = runtime.newSymbol("number_of_workers");
- this.port = RubyInteger.fix2int(options.op_aref(context, port_k));
- if (options.has_key_p(host_k).isTrue()) {
+ /* retrieve from passed in options */
+ this.port = Integer.parseInt(options.op_aref(context, port_k).toString());
this.host = options.op_aref(context, host_k).toString();
- } else {
- this.host = "0.0.0.0";
+
+ this.ssl = options.op_aref(context, ssl_k).isTrue();
+ this.numberOfWorkers = Integer.parseInt(options.op_aref(context, number_of_workers_k).toString());
+ if (options.has_key_p(keystore_path_k).isTrue()) {
+ this.keyStorePath = options.op_aref(context, keystore_path_k).toString();
+ this.keyStorePassword = options.op_aref(context, keystore_password_k).toString();
+ }
+ this.app = new RackApplication(app, this.ssl, this.numberOfWorkers);
+ if (options.has_key_p(eventbus_prefix_k).isTrue())
+ this.eventBusPrefix = options.op_aref(context, eventbus_prefix_k).toString();
+
+ /* init vertx */
+ if (options.has_key_p(cluster_host_k).isTrue()) {
+ this.clusterHost = options.op_aref(context, cluster_host_k).toString();
+ if (options.has_key_p(cluster_port_k).isTrue()) {
+ this.clusterPort = Integer.parseInt(options.op_aref(context, cluster_port_k).toString());
+ this.vertx = JubileeVertx.init(clusterPort, clusterHost);
+ }
+ this.vertx = JubileeVertx.init(clusterHost);
+ } else {
+ this.vertx = JubileeVertx.init();
+ }
+
+ httpServer = vertx.createHttpServer();
+ return this;
}
- this.ssl = options.op_aref(context, ssl_k).isTrue();
- if (options.has_key_p(number_of_workers_k).isTrue())
- this.numberOfWorkers = RubyInteger.fix2int(options.op_aref(context, number_of_workers_k));
- this.app = new RackApplication(app, this.ssl, this.numberOfWorkers);
- if (options.has_key_p(keystore_path_k).isTrue()) {
- this.keyStorePath = options.op_aref(context, keystore_path_k).toString();
- this.keyStorePassword = options.op_aref(context, keystore_password_k).toString();
+
+ /**
+ * Start http server, initialize states
+ *
+ * @param context
+ * @param block
+ * @return
+ */
+ @JRubyMethod(name = "start")
+ public IRubyObject start(final ThreadContext context, final Block block) {
+ httpServer.setAcceptBacklog(10000);
+ httpServer.requestHandler(new Handler<HttpServerRequest>() {
+ public void handle(final HttpServerRequest req) {
+ app.call(req);
+ }
+ });
+ if (eventBusPrefix != null) {
+ JsonObject config = new JsonObject().putString("prefix", eventBusPrefix);
+ JsonArray allowAll = new JsonArray();
+ allowAll.add(new JsonObject());
+ // TODO read inbounds and outbounds from config file
+ vertx.createSockJSServer(httpServer).bridge(config, allowAll, allowAll);
+ }
+ if (ssl) httpServer.setSSL(true).setKeyStorePath(this.keyStorePath)
+ .setKeyStorePassword(this.keyStorePassword);
+ httpServer.listen(this.port, this.host);
+ this.running = true;
+ return this;
}
- if (options.has_key_p(eventbus_prefix_k).isTrue()) {
- this.eventBusPrefix = options.op_aref(context, eventbus_prefix_k).toString();
- }
- running = false;
- return this;
- }
- /**
- * Start http server, initialize states
- * @param context
- * @param block
- * @return
- */
- @JRubyMethod(name = "start")
- public IRubyObject start(final ThreadContext context, final Block block) {
- this.running = true;
- httpServer.setAcceptBacklog(10000);
- httpServer.requestHandler(new Handler<HttpServerRequest>() {
- public void handle(final HttpServerRequest req) {
- app.call(req);
- }
- });
- if (eventBusPrefix != null) {
- JsonObject config = new JsonObject().putString("prefix", eventBusPrefix);
- // TODO read inbounds and outbounds from config file
- vertx.createSockJSServer(httpServer).bridge(config, new JsonArray(), new JsonArray());
+ /**
+ * Set timeout for keep alive connection
+ *
+ * @param context
+ * @param timeout (in TimeUnit.SECONDS)
+ * @return this
+ */
+ @JRubyMethod(name = "persistent_timeout=")
+ public IRubyObject setPersistentTimeout(final ThreadContext context, final IRubyObject timeout) {
+ // FIXME
+ //httpServer.setPersistentTimeout(RubyInteger.fix2long(timeout) * 1000);
+ return this;
}
- if (ssl) httpServer.setSSL(true).setKeyStorePath(this.keyStorePath)
- .setKeyStorePassword(this.keyStorePassword);
- httpServer.listen(this.port, this.host);
- return this;
- }
- /**
- * Set timeout for keep alive connection
- * @param context
- * @param timeout (in TimeUnit.SECONDS)
- * @return this
- */
- @JRubyMethod(name = "persistent_timeout=")
- public IRubyObject setPersistentTimeout(final ThreadContext context, final IRubyObject timeout) {
- // FIXME
- //httpServer.setPersistentTimeout(RubyInteger.fix2long(timeout) * 1000);
- return this;
- }
+ /**
+ * Stop the HttpServer
+ *
+ * @param context
+ * @param args if shutdown abruptly
+ * @param block callback on close
+ * @return
+ */
+ @JRubyMethod(name = {"stop", "close"}, optional = 1)
+ public IRubyObject close(ThreadContext context, IRubyObject[] args, Block block) {
+ if (running) {
+ if (args.length == 1)
+ app.shutdown(args[0].isTrue());
+ else
+ app.shutdown(false);
- /**
- * Stop the HttpServer
- * @param context
- * @param args if shutdown abruptly
- * @param block callback on close
- * @return
- */
- @JRubyMethod(name = {"stop", "close"}, optional = 1)
- public IRubyObject close(ThreadContext context, IRubyObject[] args, Block block) {
- if (running) {
- if (args.length == 1)
- app.shutdown(args[0].isTrue());
- else
- app.shutdown(false);
-
- this.running = false;
- httpServer.close();
- if (block.isGiven()) block.yieldSpecific(context);
- } else {
- getRuntime().getOutputStream().println("jubilee server not running?");
+ this.running = false;
+ httpServer.close();
+ if (block.isGiven()) block.yieldSpecific(context);
+ } else {
+ getRuntime().getOutputStream().println("jubilee server not running?");
+ }
+ return getRuntime().getNil();
}
- return getRuntime().getNil();
- }
}