lib/bolt/executor.rb in bolt-2.34.0 vs lib/bolt/executor.rb in bolt-2.35.0

- old
+ new

@@ -15,10 +15,11 @@ require 'bolt/transport/winrm' require 'bolt/transport/orch' require 'bolt/transport/local' require 'bolt/transport/docker' require 'bolt/transport/remote' +require 'bolt/yarn' module Bolt TRANSPORTS = { ssh: Bolt::Transport::SSH, winrm: Bolt::Transport::WinRM, @@ -27,11 +28,11 @@ docker: Bolt::Transport::Docker, remote: Bolt::Transport::Remote }.freeze class Executor - attr_reader :noop, :transports + attr_reader :noop, :transports, :in_parallel attr_accessor :run_as def initialize(concurrency = 1, analytics = Bolt::Analytics::NoopClient.new, noop = false, @@ -58,10 +59,11 @@ @publisher = Concurrent::SingleThreadExecutor.new @publisher.post { Thread.current[:name] = 'event-publisher' } @noop = noop @run_as = nil + @in_parallel = false @pool = if concurrency > 0 Concurrent::ThreadPoolExecutor.new(name: 'exec', max_threads: concurrency) else Concurrent.global_immediate_executor end @@ -82,10 +84,18 @@ def subscribe(subscriber, types = nil) @subscribers[subscriber] = types self end + def unsubscribe(subscriber, types = nil) + if types.nil? || types.sort == @subscribers[subscriber]&.sort + @subscribers.delete(subscriber) + elsif @subscribers[subscriber].is_a?(Array) + @subscribers[subscriber] = @subscribers[subscriber] - types + end + end + def publish_event(event) @subscribers.each do |subscriber, types| # If types isn't set or if the subscriber is subscribed to # that type of event, publish the event next unless types.nil? || types.include?(event[:type]) @@ -355,9 +365,85 @@ end end def run_plan(scope, plan, params) plan.call_by_name_with_scope(scope, params, true) + end + + def create_yarn(scope, block, object, index) + fiber = Fiber.new do + # Create the new scope + newscope = Puppet::Parser::Scope.new(scope.compiler) + local = Puppet::Parser::Scope::LocalScope.new + + # Compress the current scopes into a single vars hash to add to the new scope + current_scope = scope.effective_symtable(true) + until current_scope.nil? + current_scope.instance_variable_get(:@symbols)&.each_pair { |k, v| local[k] = v } + current_scope = current_scope.parent + end + newscope.push_ephemerals([local]) + + begin + result = catch(:return) do + args = { block.parameters[0][1].to_s => object } + block.closure.call_by_name_with_scope(newscope, args, true) + end + + # If we got a return from the block, get it's value + # Otherwise the result is the last line from the block + result = result.value if result.is_a?(Puppet::Pops::Evaluator::Return) + + # Validate the result is a PlanResult + unless Puppet::Pops::Types::TypeParser.singleton.parse('Boltlib::PlanResult').instance?(result) + raise Bolt::InvalidParallelResult.new(result.to_s, *Puppet::Pops::PuppetStack.top_of_stack) + end + + result + rescue Puppet::PreformattedError => e + if e.cause.is_a?(Bolt::Error) + e.cause + else + raise e + end + end + end + + Bolt::Yarn.new(fiber, index) + end + + def handle_event(event) + case event[:type] + when :node_result + @thread_completed = true + end + end + + def round_robin(skein) + subscribe(self, [:node_result]) + results = Array.new(skein.length) + @in_parallel = true + + until skein.empty? + @thread_completed = false + r = nil + + skein.each do |yarn| + if yarn.alive? + r = yarn.resume + else + results[yarn.index] = yarn.value + skein.delete(yarn) + end + end + + next unless r == 'unfinished' + sleep(0.1) until @thread_completed || skein.empty? + end + + @in_parallel = false + unsubscribe(self, [:node_result]) + results end class TimeoutError < RuntimeError; end def wait_until_available(targets,