lib/lightio/library/thread.rb in lightio-0.3.2 vs lib/lightio/library/thread.rb in lightio-0.4.0.pre

- old
+ new

@@ -1,177 +1,78 @@ require 'thread' -require_relative 'mutex' require_relative 'queue' module LightIO::Library class ThreadGroup + include Base include LightIO::Wrap::Wrapper - wrap ::ThreadGroup + mock ::ThreadGroup - Default = ThreadGroup._wrap(::ThreadGroup::Default) - def add(thread) - if @io.enclosed? + if @obj.enclosed? raise ThreadError, "can't move from the enclosed thread group" elsif thread.is_a?(LightIO::Library::Thread) # let thread decide how to add to group thread.send(:add_to_group, self) else - @io.add(thread) + @obj.add(thread) end self end def list - @io.list + threads + @obj.list + threads end private def threads @threads ||= [] end + + Default = ThreadGroup._wrap(::ThreadGroup::Default) end class Thread - RAW_THREAD = ::Thread - # constants ThreadError = ::ThreadError Queue = LightIO::Library::Queue Backtrace = ::Thread::Backtrace SizedQueue = LightIO::Library::SizedQueue - @current_thread = nil + extend Base::MockMethods + mock ::Thread - module FallbackHelper - module ClassMethods - def fallback_method(obj, method, warning_text) - define_method method do |*args| - warn warning_text - obj.public_send method, *args - end - end - end - - include ClassMethods - - def fallback_main_thread_methods(*methods) - methods.each {|m| fallback_method(main, m, "This method is fallback to native main thread,"\ - " it may cause unexpected behaviour,"\ - " open issues on https://github.com/socketry/lightio/issues"\ - " if this behaviour not approach you purpose")} - end - - def self.included(base) - base.send :extend, ClassMethods - end - end - - class << self - extend Forwardable - def_delegators :'LightIO::Library::Thread::RAW_THREAD', - :DEBUG, - :DEBUG=, - :handle_interrupt, - :abort_on_exception, - :abort_on_exception=, - :pending_interrupt? - - include FallbackHelper - - def fork(*args, &blk) - obj = allocate - obj.send(:init_core, *args, &blk) - obj - end - - alias start fork - - def kill(thr) - thr.kill - end - - def current - return main if LightIO::Core::LightFiber.is_root?(Fiber.current) - @current_thread || RAW_THREAD.current - end - - def exclusive(&blk) - @thread_mutex.synchronize(&blk) - end - - def list - thread_list = [] - threads.keys.each {|id| - begin - thr = ObjectSpace._id2ref(id) - unless thr.alive? - # manually remove thr from threads - thr.kill - next - end - thread_list << thr - rescue RangeError - # mean object is recycled - # just wait ruby GC call finalizer to remove it from threads - next - end - } - thread_list - end - - def pass - LightIO::Beam.pass - end - - alias stop pass - - def finalizer(object_id) - proc {threads.delete(object_id)} - end - - def main - RAW_THREAD.main - end - - private - - # threads and threads variables - def threads - @threads ||= {} - end - end - + extend LightIO::Module::Thread::ClassMethods extend Forwardable def initialize(*args, &blk) init_core(*args, &blk) end - @thread_mutex = LightIO::Library::Mutex.new def_delegators :@beam, :alive?, :value - fallback_main_thread_methods :abort_on_exception, - :abort_on_exception=, - :pending_interrupt?, - :add_trace_func, - :backtrace, - :backtrace_locations, - :priority, - :priority=, - :safe_level + def_delegators :"Thread.main", + :abort_on_exception, + :abort_on_exception=, + :pending_interrupt?, + :add_trace_func, + :backtrace, + :backtrace_locations, + :priority, + :priority=, + :safe_level def kill @beam.kill && self end alias exit kill alias terminate kill def status - if Thread.current == self + if self.class.current == self 'run' elsif alive? @beam.error.nil? ? 'sleep' : 'abouting' else @beam.error.nil? ? false : nil @@ -243,13 +144,13 @@ @beam.on_dead = proc {on_dead} @beam.on_transfer = proc {|from, to| on_transfer(from, to)} # register this thread thread_values # add self to ThreadGroup::Default - add_to_group(ThreadGroup::Default) + add_to_group(LightIO::Library::ThreadGroup::Default) # remove thread and thread variables - ObjectSpace.define_finalizer(self, self.class.finalizer(self.object_id)) + ObjectSpace.define_finalizer(self, LightIO::Library::Thread.finalizer(self.object_id)) end # add self to thread group def add_to_group(group) # remove from old group @@ -286,7 +187,137 @@ if !beam_or_fiber.instance_of?(::Fiber) || LightIO::LightFiber.is_root?(beam_or_fiber) beam_or_fiber = @beam end fibers_and_values[beam_or_fiber] ||= {} end + + class << self + extend Forwardable + def_delegators :'::Thread', + :DEBUG, + :DEBUG=, + :handle_interrupt, + :abort_on_exception, + :abort_on_exception=, + :pending_interrupt? + + def method_missing(*args) + ::Thread.__send__(*args) + end + + def respond_to?(*args) + ::Thread.respond_to?(*args) + end + + def respond_to_missing?(method, *) + ::Thread.respond_to?(method) + end + + private + + # threads and threads variables + def threads + thrs = Thread.instance_variable_get(:@threads) + thrs || Thread.instance_variable_set(:@threads, {}) + end + + def thread_mutex + mutex = Thread.instance_variable_get(:@thread_mutex) + mutex || Thread.instance_variable_set(:@thread_mutex, LightIO::Library::Mutex.new) + end + end + + class Mutex + extend Base::MockMethods + mock ::Mutex + + def initialize + @queue = LightIO::Library::Queue.new + @queue << true + @locked_thread = nil + end + + def lock + raise ThreadError, "deadlock; recursive locking" if owned? + @queue.pop + @locked_thread = LightIO::Thread.current + self + end + + def unlock + raise ThreadError, "Attempt to unlock a mutex which is not locked" unless owned? + @locked_thread = nil + @queue << true + self + end + + def locked? + !@locked_thread.nil? + end + + def owned? + @locked_thread == LightIO::Thread.current + end + + def sleep(timeout=nil) + unlock + LightIO.sleep(timeout) + lock + end + + def synchronize + raise ThreadError, 'must be called with a block' unless block_given? + lock + begin + yield + ensure + unlock + end + end + + def try_lock + if @locked_thread.nil? + lock + true + else + false + end + end + end + + class ConditionVariable + extend Base::MockMethods + mock ::ConditionVariable + + def initialize + @queue = LightIO::Library::Queue.new + end + + + def broadcast + signal until @queue.num_waiting == 0 + self + end + + def signal + @queue << true unless @queue.num_waiting == 0 + self + end + + def wait(mutex, timeout=nil) + mutex.unlock + begin + LightIO::Library::Timeout.timeout(timeout) do + @queue.pop + end + rescue Timeout::Error + nil + end + mutex.lock + self + end + end end + + Mutex = Thread::Mutex + ConditionVariable = Thread::ConditionVariable end