lib/lightio/library/thread.rb in lightio-0.3.2 vs lib/lightio/library/thread.rb in lightio-0.4.0.pre
- old
+ new
@@ -1,177 +1,78 @@
require 'thread'
-require_relative 'mutex'
require_relative 'queue'
module LightIO::Library
class ThreadGroup
+ include Base
include LightIO::Wrap::Wrapper
- wrap ::ThreadGroup
+ mock ::ThreadGroup
- Default = ThreadGroup._wrap(::ThreadGroup::Default)
-
def add(thread)
- if @io.enclosed?
+ if @obj.enclosed?
raise ThreadError, "can't move from the enclosed thread group"
elsif thread.is_a?(LightIO::Library::Thread)
# let thread decide how to add to group
thread.send(:add_to_group, self)
else
- @io.add(thread)
+ @obj.add(thread)
end
self
end
def list
- @io.list + threads
+ @obj.list + threads
end
private
def threads
@threads ||= []
end
+
+ Default = ThreadGroup._wrap(::ThreadGroup::Default)
end
class Thread
- RAW_THREAD = ::Thread
-
# constants
ThreadError = ::ThreadError
Queue = LightIO::Library::Queue
Backtrace = ::Thread::Backtrace
SizedQueue = LightIO::Library::SizedQueue
- @current_thread = nil
+ extend Base::MockMethods
+ mock ::Thread
- module FallbackHelper
- module ClassMethods
- def fallback_method(obj, method, warning_text)
- define_method method do |*args|
- warn warning_text
- obj.public_send method, *args
- end
- end
- end
-
- include ClassMethods
-
- def fallback_main_thread_methods(*methods)
- methods.each {|m| fallback_method(main, m, "This method is fallback to native main thread,"\
- " it may cause unexpected behaviour,"\
- " open issues on https://github.com/socketry/lightio/issues"\
- " if this behaviour not approach you purpose")}
- end
-
- def self.included(base)
- base.send :extend, ClassMethods
- end
- end
-
- class << self
- extend Forwardable
- def_delegators :'LightIO::Library::Thread::RAW_THREAD',
- :DEBUG,
- :DEBUG=,
- :handle_interrupt,
- :abort_on_exception,
- :abort_on_exception=,
- :pending_interrupt?
-
- include FallbackHelper
-
- def fork(*args, &blk)
- obj = allocate
- obj.send(:init_core, *args, &blk)
- obj
- end
-
- alias start fork
-
- def kill(thr)
- thr.kill
- end
-
- def current
- return main if LightIO::Core::LightFiber.is_root?(Fiber.current)
- @current_thread || RAW_THREAD.current
- end
-
- def exclusive(&blk)
- @thread_mutex.synchronize(&blk)
- end
-
- def list
- thread_list = []
- threads.keys.each {|id|
- begin
- thr = ObjectSpace._id2ref(id)
- unless thr.alive?
- # manually remove thr from threads
- thr.kill
- next
- end
- thread_list << thr
- rescue RangeError
- # mean object is recycled
- # just wait ruby GC call finalizer to remove it from threads
- next
- end
- }
- thread_list
- end
-
- def pass
- LightIO::Beam.pass
- end
-
- alias stop pass
-
- def finalizer(object_id)
- proc {threads.delete(object_id)}
- end
-
- def main
- RAW_THREAD.main
- end
-
- private
-
- # threads and threads variables
- def threads
- @threads ||= {}
- end
- end
-
+ extend LightIO::Module::Thread::ClassMethods
extend Forwardable
def initialize(*args, &blk)
init_core(*args, &blk)
end
- @thread_mutex = LightIO::Library::Mutex.new
def_delegators :@beam, :alive?, :value
- fallback_main_thread_methods :abort_on_exception,
- :abort_on_exception=,
- :pending_interrupt?,
- :add_trace_func,
- :backtrace,
- :backtrace_locations,
- :priority,
- :priority=,
- :safe_level
+ def_delegators :"Thread.main",
+ :abort_on_exception,
+ :abort_on_exception=,
+ :pending_interrupt?,
+ :add_trace_func,
+ :backtrace,
+ :backtrace_locations,
+ :priority,
+ :priority=,
+ :safe_level
def kill
@beam.kill && self
end
alias exit kill
alias terminate kill
def status
- if Thread.current == self
+ if self.class.current == self
'run'
elsif alive?
@beam.error.nil? ? 'sleep' : 'abouting'
else
@beam.error.nil? ? false : nil
@@ -243,13 +144,13 @@
@beam.on_dead = proc {on_dead}
@beam.on_transfer = proc {|from, to| on_transfer(from, to)}
# register this thread
thread_values
# add self to ThreadGroup::Default
- add_to_group(ThreadGroup::Default)
+ add_to_group(LightIO::Library::ThreadGroup::Default)
# remove thread and thread variables
- ObjectSpace.define_finalizer(self, self.class.finalizer(self.object_id))
+ ObjectSpace.define_finalizer(self, LightIO::Library::Thread.finalizer(self.object_id))
end
# add self to thread group
def add_to_group(group)
# remove from old group
@@ -286,7 +187,137 @@
if !beam_or_fiber.instance_of?(::Fiber) || LightIO::LightFiber.is_root?(beam_or_fiber)
beam_or_fiber = @beam
end
fibers_and_values[beam_or_fiber] ||= {}
end
+
+ class << self
+ extend Forwardable
+ def_delegators :'::Thread',
+ :DEBUG,
+ :DEBUG=,
+ :handle_interrupt,
+ :abort_on_exception,
+ :abort_on_exception=,
+ :pending_interrupt?
+
+ def method_missing(*args)
+ ::Thread.__send__(*args)
+ end
+
+ def respond_to?(*args)
+ ::Thread.respond_to?(*args)
+ end
+
+ def respond_to_missing?(method, *)
+ ::Thread.respond_to?(method)
+ end
+
+ private
+
+ # threads and threads variables
+ def threads
+ thrs = Thread.instance_variable_get(:@threads)
+ thrs || Thread.instance_variable_set(:@threads, {})
+ end
+
+ def thread_mutex
+ mutex = Thread.instance_variable_get(:@thread_mutex)
+ mutex || Thread.instance_variable_set(:@thread_mutex, LightIO::Library::Mutex.new)
+ end
+ end
+
+ class Mutex
+ extend Base::MockMethods
+ mock ::Mutex
+
+ def initialize
+ @queue = LightIO::Library::Queue.new
+ @queue << true
+ @locked_thread = nil
+ end
+
+ def lock
+ raise ThreadError, "deadlock; recursive locking" if owned?
+ @queue.pop
+ @locked_thread = LightIO::Thread.current
+ self
+ end
+
+ def unlock
+ raise ThreadError, "Attempt to unlock a mutex which is not locked" unless owned?
+ @locked_thread = nil
+ @queue << true
+ self
+ end
+
+ def locked?
+ !@locked_thread.nil?
+ end
+
+ def owned?
+ @locked_thread == LightIO::Thread.current
+ end
+
+ def sleep(timeout=nil)
+ unlock
+ LightIO.sleep(timeout)
+ lock
+ end
+
+ def synchronize
+ raise ThreadError, 'must be called with a block' unless block_given?
+ lock
+ begin
+ yield
+ ensure
+ unlock
+ end
+ end
+
+ def try_lock
+ if @locked_thread.nil?
+ lock
+ true
+ else
+ false
+ end
+ end
+ end
+
+ class ConditionVariable
+ extend Base::MockMethods
+ mock ::ConditionVariable
+
+ def initialize
+ @queue = LightIO::Library::Queue.new
+ end
+
+
+ def broadcast
+ signal until @queue.num_waiting == 0
+ self
+ end
+
+ def signal
+ @queue << true unless @queue.num_waiting == 0
+ self
+ end
+
+ def wait(mutex, timeout=nil)
+ mutex.unlock
+ begin
+ LightIO::Library::Timeout.timeout(timeout) do
+ @queue.pop
+ end
+ rescue Timeout::Error
+ nil
+ end
+ mutex.lock
+ self
+ end
+ end
end
+
+ Mutex = Thread::Mutex
+ ConditionVariable = Thread::ConditionVariable
end