Sha256: aa56db8584f0dcfafe55379c12a7e899d5cef575b29b37f539eb0de2b109639d

Contents?: true

Size: 621 Bytes

Versions: 4

Compression:

Stored size: 621 Bytes

Contents

# frozen_string_literal: true

require 'fiber'

module CottonTail
  module Queue
    # Queue Reader
    class Reader
      def self.spawn(queue, **kwargs)
        Thread.new { new(queue, **kwargs).start }
      end

      def initialize(queue, on_message:)
        @queue = queue
        @on_message = on_message
      end

      def fiber
        @fiber ||= Fiber.new do
          Fiber.yield @queue.pop until @queue.empty? && @queue.closed?
        end
      end

      def start
        while fiber.alive?
          args = fiber.resume
          @on_message.call(*args) if args
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
cotton-tail-0.2.1 lib/cotton_tail/queue/reader.rb
cotton-tail-0.2.0 lib/cotton_tail/queue/reader.rb
cotton-tail-0.1.2 lib/cotton_tail/queue/reader.rb
cotton-tail-0.1.1 lib/cotton_tail/queue/reader.rb