Sha256: 2a16b668a7083e49d38fd6206833666b8bb35ee77d7344a4650506b02d36c552
Contents?: true
Size: 1.24 KB
Versions: 3
Compression:
Stored size: 1.24 KB
Contents
# frozen_string_literal: true # enqueued な pipeline plan を1つ取得 & 実行 class MedPipe::PipelinePlanConsumer # @param [Proc] pipeline_runner pipeline plan から pipeline を作成し実行する def initialize(pipeline_group:, pipeline_runner:) @pipeline_group = pipeline_group @pipeline_runner = pipeline_runner end # @return [PipelinePlan] 実行した pipeline plan。なければ nil def run pipeline_plan = fetch_and_run_pipeline_plan return nil if pipeline_plan.nil? @pipeline_runner.call(pipeline_plan) complete_pipeline_plan(pipeline_plan) pipeline_plan rescue StandardError => e error_pipeline_plan(pipeline_plan) raise e end private def fetch_and_run_pipeline_plan ApplicationRecord.transaction do target_pipeline_plan = @pipeline_group.pipeline_plans.lock.status_enqueued.order(priority: :desc).first return if target_pipeline_plan.nil? target_pipeline_plan.update!(status: :running, started_at: Time.current) target_pipeline_plan end end def complete_pipeline_plan(pipeline_plan) pipeline_plan.update!(status: :finished, finished_at: Time.current) end def error_pipeline_plan(pipeline_plan) pipeline_plan.update!(status: :failed) end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
med_pipe-0.2.0 | lib/med_pipe/pipeline_plan_consumer.rb |
med_pipe-0.1.1 | lib/med_pipe/pipeline_plan_consumer.rb |
med_pipe-0.1.0.5 | lib/med_pipe/pipeline_plan_consumer.rb |