# frozen_string_literal: true export_default :Coprocess import '../extensions/core' Exceptions = import './exceptions' # Encapsulates an asynchronous task class Coprocess # inter-coprocess message passing module Messaging def <<(value) if @receive_waiting && @fiber @fiber&.schedule value else @queued_messages ||= [] @queued_messages << value end snooze end def receive if !@queued_messages || @queued_messages&.empty? wait_for_message else value = @queued_messages.shift snooze value end end def wait_for_message Gyro.ref @receive_waiting = true suspend ensure Gyro.unref @receive_waiting = nil end end include Messaging @@map = {} def self.map @@map end def self.count @@map.size end attr_reader :result, :fiber def initialize(fiber = nil, &block) @fiber = fiber @block = block end def location @block ? @block.source_location.join(':') : nil end def caller @fiber ? @fiber.caller[2..-1] : nil end def run @calling_fiber = Fiber.current @fiber = Fiber.new(location) { |v| execute(v) } @fiber.schedule @ran = true self end def execute(first_value) # The first value passed to the coprocess can be used to stop it before it # is scheduled for the first time raise first_value if first_value.is_a?(Exception) @@map[@fiber] = @fiber.coprocess = self @result = @block.call(self) rescue Exceptions::MoveOn => e @result = e.value rescue Exception => e uncaught_exception = true @result = e ensure finish_execution(uncaught_exception) end def finish_execution(uncaught_exception) @@map.delete(@fiber) @fiber.coprocess = nil @fiber = nil @awaiting_fiber&.schedule @result @when_done&.() return unless uncaught_exception && !@awaiting_fiber # if no awaiting fiber, raise any uncaught error by passing it to the # calling fiber, or to the root fiber if the calling fiber calling_fiber_alive = @calling_fiber && @calling_fiber.state != :dead calling_fiber = calling_fiber_alive ? @calling_fiber : Fiber.root calling_fiber.transfer @result end def alive? @fiber end # Kernel.await expects the given argument / block to be a callable, so #call # in fact waits for the coprocess to finish def await await_coprocess_result ensure # If the awaiting fiber has been transferred an exception, the awaited fiber # might still be running, so we need to stop it @fiber&.schedule(Exceptions::MoveOn.new) end alias_method :join, :await def await_coprocess_result run unless @ran if @fiber @awaiting_fiber = Fiber.current suspend else @result end end def when_done(&block) @when_done = block end def schedule(value = nil) @fiber&.schedule(value) end def resume(value = nil) return unless @fiber @fiber.schedule(value) snooze end def interrupt(value = nil) return unless @fiber @fiber.schedule(Exceptions::MoveOn.new(nil, value)) snooze end alias_method :stop, :interrupt def cancel! return unless @fiber @fiber.schedule(Exceptions::Cancel.new) snooze end def self.current Fiber.current.coprocess end end