ext/gyro/thread.c in polyphony-0.30 vs ext/gyro/thread.c in polyphony-0.31

- old
+ new

@@ -127,10 +127,43 @@ } } return self; } +VALUE Thread_schedule_fiber_with_priority(VALUE self, VALUE fiber, VALUE value) { + if (rb_fiber_alive_p(fiber) != Qtrue) { + return self; + } + FIBER_TRACE(3, SYM_fiber_schedule, fiber, value); + rb_ivar_set(fiber, ID_runnable_value, value); + + VALUE queue = rb_ivar_get(self, ID_run_queue); + + // if fiber is already scheduled, remove it from the run queue + if (rb_ivar_get(fiber, ID_runnable) != Qnil) { + rb_ary_delete(queue, fiber); + } else { + rb_ivar_set(fiber, ID_runnable, Qtrue); + } + + // the fiber is given priority by putting it at the front of the run queue + rb_ary_unshift(queue, fiber); + + if (rb_thread_current() != self) { + // if the fiber scheduling is done across threads, we need to make sure the + // target thread is woken up in case it is in the middle of running its + // event selector. Otherwise it's gonna be stuck waiting for an event to + // happen, not knowing that it there's already a fiber ready to run in its + // run queue. + VALUE selector = rb_ivar_get(self, ID_ivar_event_selector); + if (selector != Qnil) { + Gyro_Selector_break_out_of_ev_loop(selector); + } + } + return self; +} + VALUE Thread_switch_fiber(VALUE self) { VALUE current_fiber = rb_fiber_current(); if (__tracing_enabled__) { if (rb_ivar_get(current_fiber, ID_ivar_running) != Qfalse) { rb_funcall(rb_cObject, ID_fiber_trace, 2, SYM_fiber_switchpoint, current_fiber); @@ -200,11 +233,11 @@ return rb_ivar_get(rb_thread_current(), ID_ivar_event_selector); } VALUE Thread_fiber_break_out_of_ev_loop(VALUE self, VALUE resume_obj) { VALUE selector = rb_ivar_get(self, ID_ivar_event_selector); - Thread_schedule_fiber(self, rb_fiber_current(), resume_obj); + Thread_schedule_fiber_with_priority(self, rb_fiber_current(), resume_obj); if (Gyro_Selector_break_out_of_ev_loop(selector) == Qnil) { // we're not inside the ev_loop, so we just do a switchpoint Thread_switch_fiber(self); } @@ -241,9 +274,11 @@ rb_define_method(rb_cThread, "reset_fiber_scheduling", Thread_reset_fiber_scheduling, 0); rb_define_method(rb_cThread, "fiber_scheduling_stats", Thread_fiber_scheduling_stats, 0); rb_define_method(rb_cThread, "break_out_of_ev_loop", Thread_fiber_break_out_of_ev_loop, 1); rb_define_method(rb_cThread, "schedule_fiber", Thread_schedule_fiber, 2); + rb_define_method(rb_cThread, "schedule_fiber_with_priority", + Thread_schedule_fiber_with_priority, 2); rb_define_method(rb_cThread, "switch_fiber", Thread_switch_fiber, 0); rb_define_method(rb_cThread, "join_perform", Thread_join_perform, 0); ID_create_event_selector = rb_intern("create_event_selector");