Sha256: c936edbb868bfbbe5d6b697ae997638862dc3b661d03575df225bd13ce142a06

Contents?: true

Size: 1.09 KB

Versions: 1

Compression:

Stored size: 1.09 KB

Contents

# frozen_string_literal: true

module Simplekiq
  class OrchestrationExecutor
    def self.execute(workflow:, parent_batch:)
      new.run_step(parent_batch, workflow, 0)
    end

    def run_step(parent_batch, workflow, step)
      return if workflow.empty?

      nest_under(parent_batch) do
        *jobs = workflow.at(step)
        sidekiq_batch = Sidekiq::Batch.new
        sidekiq_batch.on(
          :success,
          self.class,
          "orchestration_workflow" => workflow, "step" => step + 1
        )

        sidekiq_batch.jobs do
          jobs.each do |job|
            Object.const_get(job["klass"]).perform_async(*job["args"])
          end
        end
      end
    end

    def nest_under(parent_batch)
      if parent_batch
        parent_batch.jobs do
          yield
        end
      else
        yield
      end
    end

    def on_success(status, options)
      return if options["step"] == options["orchestration_workflow"].length

      parent_batch = Sidekiq::Batch.new(status.parent_bid)
      run_step(parent_batch, options["orchestration_workflow"], options["step"])
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
simplekiq-0.0.3 lib/simplekiq/orchestration_executor.rb