require 'thread' require_relative 'mutex' require_relative 'queue' module LightIO::Library class ThreadGroup include LightIO::Wrap::Wrapper wrap ::ThreadGroup Default = ThreadGroup._wrap(::ThreadGroup::Default) def add(thread) if @io.enclosed? raise ThreadError, "can't move from the enclosed thread group" elsif thread.is_a?(LightIO::Library::Thread) # let thread decide how to add to group thread.send(:add_to_group, self) else @io.add(thread) end self end def list @io.list + threads end private def threads @threads ||= [] end end class Thread RAW_THREAD = ::Thread # constants ThreadError = ::ThreadError Queue = LightIO::Library::Queue Backtrace = ::Thread::Backtrace SizedQueue = LightIO::Library::SizedQueue @current_thread = nil module FallbackHelper module ClassMethods def fallback_method(obj, method, warning_text) define_method method do |*args| warn warning_text obj.public_send method, *args end end end include ClassMethods def fallback_main_thread_methods(*methods) methods.each {|m| fallback_method(main, m, "This method is fallback to native main thread,"\ " it may cause unexpected behaviour,"\ " open issues on https://github.com/socketry/lightio/issues"\ " if this behaviour not approach you purpose")} end def self.included(base) base.send :extend, ClassMethods end end class << self extend Forwardable def_delegators :'LightIO::Library::Thread::RAW_THREAD', :DEBUG, :DEBUG=, :handle_interrupt, :abort_on_exception, :abort_on_exception=, :pending_interrupt? include FallbackHelper def fork(*args, &blk) obj = allocate obj.send(:init_core, *args, &blk) obj end alias start fork def kill(thr) thr.kill end def current return main if LightIO::Core::LightFiber.is_root?(Fiber.current) @current_thread || RAW_THREAD.current end def exclusive(&blk) @thread_mutex.synchronize(&blk) end def list thread_list = [] threads.keys.each {|id| begin thr = ObjectSpace._id2ref(id) unless thr.alive? # manually remove thr from threads thr.kill next end thread_list << thr rescue RangeError # mean object is recycled # just wait ruby GC call finalizer to remove it from threads next end } thread_list end def pass LightIO::Beam.pass end alias stop pass def finalizer(object_id) proc {threads.delete(object_id)} end def main RAW_THREAD.main end private # threads and threads variables def threads @threads ||= {} end end extend Forwardable def initialize(*args, &blk) init_core(*args, &blk) end @thread_mutex = LightIO::Library::Mutex.new def_delegators :@beam, :alive?, :value fallback_main_thread_methods :abort_on_exception, :abort_on_exception=, :pending_interrupt?, :add_trace_func, :backtrace, :backtrace_locations, :priority, :priority=, :safe_level def kill @beam.kill && self end alias exit kill alias terminate kill def status if Thread.current == self 'run' elsif alive? @beam.error.nil? ? 'sleep' : 'abouting' else @beam.error.nil? ? false : nil end end def thread_variables thread_values.keys end def thread_variable_get(name) thread_values[name.to_sym] end def thread_variable_set(name, value) thread_values[name.to_sym] = value end def thread_variable?(key) thread_values.key?(key) end def [](name) fiber_values[name.to_sym] end def []=(name, val) fiber_values[name.to_sym] = val end def group @group end def inspect "#" end def join(limit=nil) @beam.join(limit) && self end def key?(sym) fiber_values.has_key?(sym) end def keys fiber_values.keys end def raise(exception, message=nil, backtrace=nil) @beam.raise(LightIO::Beam::BeamError.new(exception), message, backtrace) end def run Kernel.raise ThreadError, 'killed thread' unless alive? Thread.pass end alias wakeup run def stop? !alive? || status == 'sleep' end private def init_core(*args, &blk) @beam = LightIO::Beam.new(*args, &blk) @beam.on_dead = proc {on_dead} @beam.on_transfer = proc {|from, to| on_transfer(from, to)} # register this thread thread_values # add self to ThreadGroup::Default add_to_group(ThreadGroup::Default) # remove thread and thread variables ObjectSpace.define_finalizer(self, self.class.finalizer(self.object_id)) end # add self to thread group def add_to_group(group) # remove from old group remove_from_group @group = group @group.send(:threads) << self end # remove thread from group when dead def remove_from_group @group.send(:threads).delete(self) if @group end def on_dead # release references remove_from_group end def on_transfer(from, to) Thread.instance_variable_set(:@current_thread, self) end def thread_values Thread.send(:threads)[object_id] ||= {} end def fibers_and_values @fibers_and_values ||= {} end def fiber_values beam_or_fiber = LightIO::Beam.current # only consider non-root fiber if !beam_or_fiber.instance_of?(::Fiber) || LightIO::LightFiber.is_root?(beam_or_fiber) beam_or_fiber = @beam end fibers_and_values[beam_or_fiber] ||= {} end end end