Sha256: c0f2b1c4a4568c1a43128c673c8e8f8949394b0e283c50621c9aa0d2ce20a378

Contents?: true

Size: 1.69 KB

Versions: 25

Compression:

Stored size: 1.69 KB

Contents

# frozen_string_literal: true
require 'dynflow/executors/parallel/pool'
require 'dynflow/executors/parallel/worker'

module Dynflow
  module Executors
    class Parallel
      class Core < Abstract::Core
        attr_reader :logger

        def initialize(world, heartbeat_interval, queues_options)
          super
          @pools = {}
          initialize_queues
        end

        def initialize_queues
          default_pool_size = @queues_options[:default][:pool_size]
          @queues_options.each do |(queue_name, queue_options)|
            queue_pool_size = queue_options.fetch(:pool_size, default_pool_size)
            @pools[queue_name] = Pool.spawn("pool #{queue_name}", @world,
                                            reference, queue_name, queue_pool_size,
                                            @world.transaction_adapter)
          end
        end

        def start_termination(*args)
          super
          @pools.values.each { |pool| pool.tell([:start_termination, Concurrent::Promises.resolvable_future]) }
        end

        def finish_termination(pool_name)
          @pools.delete(pool_name)
          # we expect this message from all worker pools
          return unless @pools.empty?
          super()
        end

        def execution_status(execution_plan_id = nil)
          @pools.each_with_object({}) do |(pool_name, pool), hash|
            hash[pool_name] = pool.ask!([:execution_status, execution_plan_id])
          end
        end

        def feed_pool(work_items)
          work_items.each do |new_work|
            new_work.world = @world
            @pools.fetch(suggest_queue(new_work)).tell([:schedule_work, new_work])
          end
        end
      end
    end
  end
end

Version data entries

25 entries across 25 versions & 1 rubygems

Version Path
dynflow-1.8.2 lib/dynflow/executors/parallel/core.rb
dynflow-1.8.1 lib/dynflow/executors/parallel/core.rb
dynflow-1.8.0 lib/dynflow/executors/parallel/core.rb
dynflow-1.7.0 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.11 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.10 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.8 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.7 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.6 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.5 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.4 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.3 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.2 lib/dynflow/executors/parallel/core.rb
dynflow-1.6.1 lib/dynflow/executors/parallel/core.rb
dynflow-1.4.9 lib/dynflow/executors/parallel/core.rb
dynflow-1.4.8 lib/dynflow/executors/parallel/core.rb
dynflow-1.5.0 lib/dynflow/executors/parallel/core.rb
dynflow-1.4.7 lib/dynflow/executors/parallel/core.rb
dynflow-1.4.6 lib/dynflow/executors/parallel/core.rb
dynflow-1.4.5 lib/dynflow/executors/parallel/core.rb