ext/event.c in nyara-0.0.1.pre.6 vs ext/event.c in nyara-0.0.1.pre.8
- old
+ new
@@ -16,10 +16,11 @@
#define ETYPE_CAN_ACCEPT 0
#define ETYPE_HANDLE_REQUEST 1
#define ETYPE_CONNECT 2
#define MAX_E 1024
static void loop_body(int fd, int etype);
+static void loop_check();
static int qfd = 0;
#define MAX_RECEIVE_DATA 65536 * 2
static char received_data[MAX_RECEIVE_DATA];
extern http_parser_settings nyara_request_parse_settings;
@@ -36,18 +37,36 @@
static VALUE sym_term_close;
static VALUE sym_writing;
static VALUE sym_reading;
static VALUE sym_sleep;
static Request* curr_request;
+static VALUE to_resume_actions;
+static bool graceful_quit = false;
static VALUE _fiber_func(VALUE _, VALUE args) {
VALUE instance = rb_ary_pop(args);
VALUE meth = rb_ary_pop(args);
rb_apply(instance, SYM2ID(meth), args);
return Qnil;
}
+static void _resume_action(Request* p) {
+ VALUE state = rb_fiber_resume(p->fiber, 0, NULL);
+ if (state == Qnil) { // _fiber_func always returns Qnil
+ // terminated (todo log raised error ?)
+ nyara_request_term_close(p->self);
+ } else if (state == sym_term_close) {
+ nyara_request_term_close(p->self);
+ } else if (state == sym_writing) {
+ // do nothing
+ } else if (state == sym_reading) {
+ // do nothing
+ } else if (state == sym_sleep) {
+ // do nothing
+ }
+}
+
static void _handle_request(VALUE request) {
Request* p;
Data_Get_Struct(request, Request, p);
if (p->sleeping) {
return;
@@ -90,31 +109,23 @@
p->format = result.format;
p->response_header = rb_class_new_instance(0, NULL, nyara_header_hash_class);
p->response_header_extra_lines = rb_ary_new();
nyara_request_init_env(request);
} else {
- rb_funcall(p->self, id_not_found, 0);
+ static const char* not_found = "HTTP/1.1 404 Not Found\r\nConnection: close\r\nContent-Length: 0\r\n\r\n";
+ static long not_found_len = 0;
+ if (!not_found_len) {
+ not_found_len = strlen(not_found);
+ }
+ nyara_send_data(p->fd, not_found, not_found_len);
nyara_detach_fd(p->fd);
p->fd = 0;
return;
}
}
- // resume action
- VALUE state = rb_fiber_resume(p->fiber, 0, NULL);
- if (state == Qnil) { // _fiber_func always returns Qnil
- // terminated (todo log raised error ?)
- nyara_request_term_close(request);
- } else if (state == sym_term_close) {
- nyara_request_term_close(request);
- } else if (state == sym_writing) {
- // do nothing
- } else if (state == sym_reading) {
- // do nothing
- } else if (state == sym_sleep) {
- // do nothing
- }
+ _resume_action(p);
}
// platform independent, invoked by LOOP_E()
static void loop_body(int fd, int etype) {
switch (etype) {
@@ -143,10 +154,49 @@
}
}
}
}
+static void loop_check() {
+ // execute other thread / interrupts
+ rb_thread_schedule();
+
+ // wakeup actions which finished sleeping
+ long len = RARRAY_LEN(to_resume_actions);
+ if (len) {
+ VALUE* ptr = RARRAY_PTR(to_resume_actions);
+ for (long i = 0; i < len; i++) {
+ VALUE request = ptr[i];
+ Request* p;
+ Data_Get_Struct(request, Request, p);
+
+ p->sleeping = false;
+ if (!rb_fiber_alive_p(p->fiber)) {
+ continue;
+ }
+
+ _resume_action(p);
+ if (qfd) {
+ VALUE* v_fds = RARRAY_PTR(p->watched_fds);
+ long v_fds_len = RARRAY_LEN(p->watched_fds);
+ for (long i = 0; i < v_fds_len; i++) {
+ ADD_E(FIX2INT(v_fds[i]), ETYPE_CONNECT);
+ }
+ ADD_E(p->fd, ETYPE_HANDLE_REQUEST);
+ } else {
+ // we are in a test, no queue
+ }
+ }
+ }
+
+ if (graceful_quit) {
+ if (RTEST(rb_funcall(fd_request_map, rb_intern("empty?"), 0))) {
+ _Exit(0);
+ }
+ }
+}
+
void nyara_detach_fd(int fd) {
VALUE request = rb_hash_delete(fd_request_map, INT2FIX(fd));
if (request != Qnil) {
Request* p;
Data_Get_Struct(request, Request, p);
@@ -162,19 +212,29 @@
static VALUE ext_init_queue(VALUE _) {
INIT_E();
return Qnil;
}
-static VALUE ext_run_queue(VALUE _, VALUE v_fd) {
- int fd = FIX2INT(v_fd);
+// run queue loop with server_fd
+static VALUE ext_run_queue(VALUE _, VALUE v_server_fd) {
+ int fd = FIX2INT(v_server_fd);
nyara_set_nonblock(fd);
ADD_E(fd, ETYPE_CAN_ACCEPT);
LOOP_E();
return Qnil;
}
+// set graceful quit flag and do not accept server_fd anymore
+static VALUE ext_graceful_quit(VALUE _, VALUE v_server_fd) {
+ graceful_quit = true;
+ int fd = FIX2INT(v_server_fd);
+ DEL_E(fd);
+ return Qnil;
+}
+
+// put request into sleep
static VALUE ext_request_sleep(VALUE _, VALUE request) {
Request* p;
Data_Get_Struct(request, Request, p);
p->sleeping = true;
@@ -190,27 +250,14 @@
}
DEL_E(p->fd);
return Qnil;
}
+// NOTE this will be executed in another thread, resuming fiber in a non-main thread will stuck
static VALUE ext_request_wakeup(VALUE _, VALUE request) {
// NOTE should not use curr_request
- Request* p;
- Data_Get_Struct(request, Request, p);
-
- p->sleeping = false;
- if (!qfd) {
- // we are in a test
- return Qnil;
- }
-
- VALUE* v_fds = RARRAY_PTR(p->watched_fds);
- long v_fds_len = RARRAY_LEN(p->watched_fds);
- for (long i = 0; i < v_fds_len; i++) {
- ADD_E(FIX2INT(v_fds[i]), ETYPE_CONNECT);
- }
- ADD_E(p->fd, ETYPE_HANDLE_REQUEST);
+ rb_ary_push(to_resume_actions, request);
return Qnil;
}
static VALUE ext_set_nonblock(VALUE _, VALUE v_fd) {
int fd = FIX2INT(v_fd);
@@ -328,11 +375,15 @@
sym_term_close = ID2SYM(rb_intern("term_close"));
sym_writing = ID2SYM(rb_intern("writing"));
sym_reading = ID2SYM(rb_intern("reading"));
sym_sleep = ID2SYM(rb_intern("sleep"));
+ to_resume_actions = rb_ary_new();
+ rb_gc_register_mark_object(to_resume_actions);
+
rb_define_singleton_method(ext, "init_queue", ext_init_queue, 0);
rb_define_singleton_method(ext, "run_queue", ext_run_queue, 1);
+ rb_define_singleton_method(ext, "graceful_quit", ext_graceful_quit, 1);
rb_define_singleton_method(ext, "request_sleep", ext_request_sleep, 1);
rb_define_singleton_method(ext, "request_wakeup", ext_request_wakeup, 1);
// fd operations