Sha256: c32c07eb116969b6922f7c66367091e6beadc99080d85794a8f28857739f2cd3
Contents?: true
Size: 1.04 KB
Versions: 4
Compression:
Stored size: 1.04 KB
Contents
# frozen_string_literal: true # prioryty が高いものから順に、並列数を踏まえて複数のPipelinePlanの状態をenqueuedに変更する class MedPipe::PipelinePlanProducer # @param pipeline_group [MedPipe::PipelineGroup] def initialize(pipeline_group) @pipeline_group = pipeline_group end # @return [Array<MedPipe::PipelinePlan>] Enqueued pipeline plans. 未実行ならnilを返す def run return if @pipeline_group.parallel_limit <= 0 @pipeline_group.with_lock do enqueue_count = @pipeline_group.parallel_limit - @pipeline_group.pipeline_plans.active.count enqueue(enqueue_count) if enqueue_count.positive? end end private def enqueue(size) target_pipeline_plans = fetch_target_pipeline_plans(size: size) return if target_pipeline_plans.empty? target_pipeline_plans.each do |pipline_plan| pipline_plan.update!(status: :enqueued) end end def fetch_target_pipeline_plans(size:) @pipeline_group.pipeline_plans.status_waiting.order(priority: :desc).limit(size) end end
Version data entries
4 entries across 4 versions & 1 rubygems