Sha256: 8d2b8a6c5e4eb34395f21680e84afa2322c78c40a9112f91f962d184776779b8

Contents?: true

Size: 710 Bytes

Versions: 1

Compression:

Stored size: 710 Bytes

Contents

# frozen_string_literal: true

require 'fiber'

module CottonTail
  module Queue
    # Queue Reader
    class Reader
      def self.spawn(queue, **kwargs)
        Thread.new { new(queue, **kwargs).start }
      end

      def initialize(queue, app:)
        @queue = queue
        @app = app
      end

      def fiber
        @fiber ||= Fiber.new do
          Fiber.yield @queue.pop until @queue.empty? && @queue.closed?
        end
      end

      def start
        call_next while fiber.alive?
      end

      private

      def call_next
        args = fiber.resume
        middleware.call([@app, *args]) if args
      end

      def middleware
        @app.config.middleware
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
cotton-tail-0.3.0 lib/cotton_tail/queue/reader.rb