Sha256: 2a16b668a7083e49d38fd6206833666b8bb35ee77d7344a4650506b02d36c552

Contents?: true

Size: 1.24 KB

Versions: 3

Compression:

Stored size: 1.24 KB

Contents

# frozen_string_literal: true

# enqueued な pipeline plan を1つ取得 & 実行
class MedPipe::PipelinePlanConsumer
  # @param [Proc] pipeline_runner pipeline plan から pipeline を作成し実行する
  def initialize(pipeline_group:, pipeline_runner:)
    @pipeline_group = pipeline_group
    @pipeline_runner = pipeline_runner
  end

  # @return [PipelinePlan] 実行した pipeline plan。なければ nil
  def run
    pipeline_plan = fetch_and_run_pipeline_plan
    return nil if pipeline_plan.nil?

    @pipeline_runner.call(pipeline_plan)
    complete_pipeline_plan(pipeline_plan)
    pipeline_plan
  rescue StandardError => e
    error_pipeline_plan(pipeline_plan)
    raise e
  end

  private

  def fetch_and_run_pipeline_plan
    ApplicationRecord.transaction do
      target_pipeline_plan = @pipeline_group.pipeline_plans.lock.status_enqueued.order(priority: :desc).first
      return if target_pipeline_plan.nil?

      target_pipeline_plan.update!(status: :running, started_at: Time.current)
      target_pipeline_plan
    end
  end

  def complete_pipeline_plan(pipeline_plan)
    pipeline_plan.update!(status: :finished, finished_at: Time.current)
  end

  def error_pipeline_plan(pipeline_plan)
    pipeline_plan.update!(status: :failed)
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
med_pipe-0.2.0 lib/med_pipe/pipeline_plan_consumer.rb
med_pipe-0.1.1 lib/med_pipe/pipeline_plan_consumer.rb
med_pipe-0.1.0.5 lib/med_pipe/pipeline_plan_consumer.rb