Sha256: ece5bd3259f809ff4932846dc4e5d43200dfff6683aed447e06fa3f8a5da8024

Contents?: true

Size: 1.88 KB

Versions: 7

Compression:

Stored size: 1.88 KB

Contents

# frozen_string_literal: true

module Ductr
  #
  # Representation of a pipeline's step.
  # Hold a fiber to execute steps concurrently.
  #
  class PipelineStep
    extend Forwardable

    #
    # @!method resume
    #   Resumes the step's fiber.
    #   @return [void]
    # @!method alive?
    #   Check if the step's fiber is running.
    #   @return [Boolean] True if step's fiber is running
    #
    def_delegators :fiber, :resume, :alive?

    # @return [Pipeline] The step's pipeline
    attr_reader :pipeline
    # @return [Symbol] The step's name
    attr_reader :name
    # @return [Array<Job>] The step's queued jobs
    attr_reader :jobs

    # @return [PipelineStep] The previous step
    attr_accessor :left

    #
    # Creates a step for the given pipeline.
    #
    # @param [Pipeline] pipeline The pipeline containing step's method
    # @param [Symbol] The name of the pipeline's step method
    #
    def initialize(pipeline, name)
      @pipeline = pipeline
      @name = name

      @jobs = []
      @left = []
    end

    #
    # Track, registers and enqueues the given job.
    #
    # @param [Job] job The job to enqueue
    #
    # @return [void]
    #
    def enqueue_job(job)
      jobs.push(job)
      Store.register_job(job)
      job.enqueue
    end

    #
    # Check if the step is done.
    #
    # @return [Boolean] True if the step is done
    #
    def done?
      !fiber.alive?
    end

    #
    # Waits until all step's jobs are stopped.
    #
    # @return [void]
    #
    def flush_jobs
      return if jobs.empty?

      Fiber.yield until Store.read_jobs(*jobs).all?(&:stopped?)
    end

    #
    # The step's fiber instance, invokes the pipeline's step method.
    #
    # @return [Fiber] The step's fiber
    #
    def fiber
      @fiber ||= Fiber.new do
        Fiber.yield until left.all?(&:done?)

        pipeline.send(name)
        flush_jobs
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
ductr-0.2.3 lib/ductr/pipeline_step.rb
ductr-0.2.2 lib/ductr/pipeline_step.rb
ductr-0.2.1 lib/ductr/pipeline_step.rb
ductr-0.2.0 lib/ductr/pipeline_step.rb
ductr-0.1.2 lib/ductr/pipeline_step.rb
ductr-0.1.1 lib/ductr/pipeline_step.rb
ductr-0.1.0 lib/ductr/pipeline_step.rb