Sha256: 9f8ccda33b17664a1c8f5430225a3f33e080e8be36d6881eab99e2fe19cefe0f

Contents?: true

Size: 1.38 KB

Versions: 2

Compression:

Stored size: 1.38 KB

Contents

# frozen_string_literal: true

module Dynflow
  module Executors
    module Sidekiq
      module WorkerJobs
        class PerformWork < InternalJobBase
          def perform(work_item)
            with_telemetry(work_item) do
              Executors.run_user_code do
                work_item.world = Dynflow.process_world
                work_item.execute
              end
            end
          rescue Errors::PersistenceError => e
            OrchestratorJobs::HandlePersistenceError.perform_async(e, work_item)
          ensure
            step = work_item.step if work_item.is_a?(Director::StepWorkItem)
            OrchestratorJobs::WorkerDone.perform_async(work_item, step && step.delayed_events)
          end

          private

          def with_telemetry(work_item)
            Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, +1, telemetry_options(work_item)) }
            yield
          ensure
            Dynflow::Telemetry.with_instance do |t|
              t.increment_counter(:dynflow_worker_events, 1, telemetry_options(work_item))
              t.set_gauge(:dynflow_active_workers, -1, telemetry_options(work_item))
            end
          end
        end

        class DrainMarker < InternalJobBase
          def perform(world_id)
            OrchestratorJobs::StartupComplete.perform_async(world_id)
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
dynflow-1.9.0 lib/dynflow/executors/sidekiq/worker_jobs.rb
dynflow-1.8.3 lib/dynflow/executors/sidekiq/worker_jobs.rb