Sha256: 6d43fe94ba7d7865761983342f093e9f7f2ef101410f1f4ab468e6586e6ca27f
Contents?: true
Size: 978 Bytes
Versions: 6
Compression:
Stored size: 978 Bytes
Contents
# frozen_string_literal: true export :process, :setup, :size=, :busy? @size = 10 def process(&block) setup unless @task_queue start_task_on_thread(block) end def start_task_on_thread(block) EV.ref @task_queue << [block, Fiber.current] suspend ensure EV.unref end def size=(size) @size = size end def busy? !@queue.empty? end def setup @task_queue = ::Queue.new @resolve_queue = ::Queue.new @async_watcher = EV::Async.new { resolve_from_queue } EV.unref @threads = (1..@size).map { Thread.new { thread_loop } } end def resolve_from_queue until @resolve_queue.empty? (fiber, result) = @resolve_queue.pop(true) fiber.transfer result unless fiber.cancelled? end end def thread_loop loop { run_queued_task } end def run_queued_task (block, fiber) = @task_queue.pop result = block.() @resolve_queue << [fiber, result] @async_watcher.signal! rescue Exception => e @resolve_queue << [fiber, e] @async_watcher.signal! end
Version data entries
6 entries across 6 versions & 1 rubygems