Sha256: aa56db8584f0dcfafe55379c12a7e899d5cef575b29b37f539eb0de2b109639d
Contents?: true
Size: 621 Bytes
Versions: 4
Compression:
Stored size: 621 Bytes
Contents
# frozen_string_literal: true require 'fiber' module CottonTail module Queue # Queue Reader class Reader def self.spawn(queue, **kwargs) Thread.new { new(queue, **kwargs).start } end def initialize(queue, on_message:) @queue = queue @on_message = on_message end def fiber @fiber ||= Fiber.new do Fiber.yield @queue.pop until @queue.empty? && @queue.closed? end end def start while fiber.alive? args = fiber.resume @on_message.call(*args) if args end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems