ext/event.c in nyara-0.0.1.pre.6 vs ext/event.c in nyara-0.0.1.pre.8

- old
+ new

@@ -16,10 +16,11 @@ #define ETYPE_CAN_ACCEPT 0 #define ETYPE_HANDLE_REQUEST 1 #define ETYPE_CONNECT 2 #define MAX_E 1024 static void loop_body(int fd, int etype); +static void loop_check(); static int qfd = 0; #define MAX_RECEIVE_DATA 65536 * 2 static char received_data[MAX_RECEIVE_DATA]; extern http_parser_settings nyara_request_parse_settings; @@ -36,18 +37,36 @@ static VALUE sym_term_close; static VALUE sym_writing; static VALUE sym_reading; static VALUE sym_sleep; static Request* curr_request; +static VALUE to_resume_actions; +static bool graceful_quit = false; static VALUE _fiber_func(VALUE _, VALUE args) { VALUE instance = rb_ary_pop(args); VALUE meth = rb_ary_pop(args); rb_apply(instance, SYM2ID(meth), args); return Qnil; } +static void _resume_action(Request* p) { + VALUE state = rb_fiber_resume(p->fiber, 0, NULL); + if (state == Qnil) { // _fiber_func always returns Qnil + // terminated (todo log raised error ?) + nyara_request_term_close(p->self); + } else if (state == sym_term_close) { + nyara_request_term_close(p->self); + } else if (state == sym_writing) { + // do nothing + } else if (state == sym_reading) { + // do nothing + } else if (state == sym_sleep) { + // do nothing + } +} + static void _handle_request(VALUE request) { Request* p; Data_Get_Struct(request, Request, p); if (p->sleeping) { return; @@ -90,31 +109,23 @@ p->format = result.format; p->response_header = rb_class_new_instance(0, NULL, nyara_header_hash_class); p->response_header_extra_lines = rb_ary_new(); nyara_request_init_env(request); } else { - rb_funcall(p->self, id_not_found, 0); + static const char* not_found = "HTTP/1.1 404 Not Found\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"; + static long not_found_len = 0; + if (!not_found_len) { + not_found_len = strlen(not_found); + } + nyara_send_data(p->fd, not_found, not_found_len); nyara_detach_fd(p->fd); p->fd = 0; return; } } - // resume action - VALUE state = rb_fiber_resume(p->fiber, 0, NULL); - if (state == Qnil) { // _fiber_func always returns Qnil - // terminated (todo log raised error ?) - nyara_request_term_close(request); - } else if (state == sym_term_close) { - nyara_request_term_close(request); - } else if (state == sym_writing) { - // do nothing - } else if (state == sym_reading) { - // do nothing - } else if (state == sym_sleep) { - // do nothing - } + _resume_action(p); } // platform independent, invoked by LOOP_E() static void loop_body(int fd, int etype) { switch (etype) { @@ -143,10 +154,49 @@ } } } } +static void loop_check() { + // execute other thread / interrupts + rb_thread_schedule(); + + // wakeup actions which finished sleeping + long len = RARRAY_LEN(to_resume_actions); + if (len) { + VALUE* ptr = RARRAY_PTR(to_resume_actions); + for (long i = 0; i < len; i++) { + VALUE request = ptr[i]; + Request* p; + Data_Get_Struct(request, Request, p); + + p->sleeping = false; + if (!rb_fiber_alive_p(p->fiber)) { + continue; + } + + _resume_action(p); + if (qfd) { + VALUE* v_fds = RARRAY_PTR(p->watched_fds); + long v_fds_len = RARRAY_LEN(p->watched_fds); + for (long i = 0; i < v_fds_len; i++) { + ADD_E(FIX2INT(v_fds[i]), ETYPE_CONNECT); + } + ADD_E(p->fd, ETYPE_HANDLE_REQUEST); + } else { + // we are in a test, no queue + } + } + } + + if (graceful_quit) { + if (RTEST(rb_funcall(fd_request_map, rb_intern("empty?"), 0))) { + _Exit(0); + } + } +} + void nyara_detach_fd(int fd) { VALUE request = rb_hash_delete(fd_request_map, INT2FIX(fd)); if (request != Qnil) { Request* p; Data_Get_Struct(request, Request, p); @@ -162,19 +212,29 @@ static VALUE ext_init_queue(VALUE _) { INIT_E(); return Qnil; } -static VALUE ext_run_queue(VALUE _, VALUE v_fd) { - int fd = FIX2INT(v_fd); +// run queue loop with server_fd +static VALUE ext_run_queue(VALUE _, VALUE v_server_fd) { + int fd = FIX2INT(v_server_fd); nyara_set_nonblock(fd); ADD_E(fd, ETYPE_CAN_ACCEPT); LOOP_E(); return Qnil; } +// set graceful quit flag and do not accept server_fd anymore +static VALUE ext_graceful_quit(VALUE _, VALUE v_server_fd) { + graceful_quit = true; + int fd = FIX2INT(v_server_fd); + DEL_E(fd); + return Qnil; +} + +// put request into sleep static VALUE ext_request_sleep(VALUE _, VALUE request) { Request* p; Data_Get_Struct(request, Request, p); p->sleeping = true; @@ -190,27 +250,14 @@ } DEL_E(p->fd); return Qnil; } +// NOTE this will be executed in another thread, resuming fiber in a non-main thread will stuck static VALUE ext_request_wakeup(VALUE _, VALUE request) { // NOTE should not use curr_request - Request* p; - Data_Get_Struct(request, Request, p); - - p->sleeping = false; - if (!qfd) { - // we are in a test - return Qnil; - } - - VALUE* v_fds = RARRAY_PTR(p->watched_fds); - long v_fds_len = RARRAY_LEN(p->watched_fds); - for (long i = 0; i < v_fds_len; i++) { - ADD_E(FIX2INT(v_fds[i]), ETYPE_CONNECT); - } - ADD_E(p->fd, ETYPE_HANDLE_REQUEST); + rb_ary_push(to_resume_actions, request); return Qnil; } static VALUE ext_set_nonblock(VALUE _, VALUE v_fd) { int fd = FIX2INT(v_fd); @@ -328,11 +375,15 @@ sym_term_close = ID2SYM(rb_intern("term_close")); sym_writing = ID2SYM(rb_intern("writing")); sym_reading = ID2SYM(rb_intern("reading")); sym_sleep = ID2SYM(rb_intern("sleep")); + to_resume_actions = rb_ary_new(); + rb_gc_register_mark_object(to_resume_actions); + rb_define_singleton_method(ext, "init_queue", ext_init_queue, 0); rb_define_singleton_method(ext, "run_queue", ext_run_queue, 1); + rb_define_singleton_method(ext, "graceful_quit", ext_graceful_quit, 1); rb_define_singleton_method(ext, "request_sleep", ext_request_sleep, 1); rb_define_singleton_method(ext, "request_wakeup", ext_request_wakeup, 1); // fd operations