lib/bolt/executor.rb in bolt-2.34.0 vs lib/bolt/executor.rb in bolt-2.35.0
- old
+ new
@@ -15,10 +15,11 @@
require 'bolt/transport/winrm'
require 'bolt/transport/orch'
require 'bolt/transport/local'
require 'bolt/transport/docker'
require 'bolt/transport/remote'
+require 'bolt/yarn'
module Bolt
TRANSPORTS = {
ssh: Bolt::Transport::SSH,
winrm: Bolt::Transport::WinRM,
@@ -27,11 +28,11 @@
docker: Bolt::Transport::Docker,
remote: Bolt::Transport::Remote
}.freeze
class Executor
- attr_reader :noop, :transports
+ attr_reader :noop, :transports, :in_parallel
attr_accessor :run_as
def initialize(concurrency = 1,
analytics = Bolt::Analytics::NoopClient.new,
noop = false,
@@ -58,10 +59,11 @@
@publisher = Concurrent::SingleThreadExecutor.new
@publisher.post { Thread.current[:name] = 'event-publisher' }
@noop = noop
@run_as = nil
+ @in_parallel = false
@pool = if concurrency > 0
Concurrent::ThreadPoolExecutor.new(name: 'exec', max_threads: concurrency)
else
Concurrent.global_immediate_executor
end
@@ -82,10 +84,18 @@
def subscribe(subscriber, types = nil)
@subscribers[subscriber] = types
self
end
+ def unsubscribe(subscriber, types = nil)
+ if types.nil? || types.sort == @subscribers[subscriber]&.sort
+ @subscribers.delete(subscriber)
+ elsif @subscribers[subscriber].is_a?(Array)
+ @subscribers[subscriber] = @subscribers[subscriber] - types
+ end
+ end
+
def publish_event(event)
@subscribers.each do |subscriber, types|
# If types isn't set or if the subscriber is subscribed to
# that type of event, publish the event
next unless types.nil? || types.include?(event[:type])
@@ -355,9 +365,85 @@
end
end
def run_plan(scope, plan, params)
plan.call_by_name_with_scope(scope, params, true)
+ end
+
+ def create_yarn(scope, block, object, index)
+ fiber = Fiber.new do
+ # Create the new scope
+ newscope = Puppet::Parser::Scope.new(scope.compiler)
+ local = Puppet::Parser::Scope::LocalScope.new
+
+ # Compress the current scopes into a single vars hash to add to the new scope
+ current_scope = scope.effective_symtable(true)
+ until current_scope.nil?
+ current_scope.instance_variable_get(:@symbols)&.each_pair { |k, v| local[k] = v }
+ current_scope = current_scope.parent
+ end
+ newscope.push_ephemerals([local])
+
+ begin
+ result = catch(:return) do
+ args = { block.parameters[0][1].to_s => object }
+ block.closure.call_by_name_with_scope(newscope, args, true)
+ end
+
+ # If we got a return from the block, get it's value
+ # Otherwise the result is the last line from the block
+ result = result.value if result.is_a?(Puppet::Pops::Evaluator::Return)
+
+ # Validate the result is a PlanResult
+ unless Puppet::Pops::Types::TypeParser.singleton.parse('Boltlib::PlanResult').instance?(result)
+ raise Bolt::InvalidParallelResult.new(result.to_s, *Puppet::Pops::PuppetStack.top_of_stack)
+ end
+
+ result
+ rescue Puppet::PreformattedError => e
+ if e.cause.is_a?(Bolt::Error)
+ e.cause
+ else
+ raise e
+ end
+ end
+ end
+
+ Bolt::Yarn.new(fiber, index)
+ end
+
+ def handle_event(event)
+ case event[:type]
+ when :node_result
+ @thread_completed = true
+ end
+ end
+
+ def round_robin(skein)
+ subscribe(self, [:node_result])
+ results = Array.new(skein.length)
+ @in_parallel = true
+
+ until skein.empty?
+ @thread_completed = false
+ r = nil
+
+ skein.each do |yarn|
+ if yarn.alive?
+ r = yarn.resume
+ else
+ results[yarn.index] = yarn.value
+ skein.delete(yarn)
+ end
+ end
+
+ next unless r == 'unfinished'
+ sleep(0.1) until @thread_completed || skein.empty?
+ end
+
+ @in_parallel = false
+ unsubscribe(self, [:node_result])
+ results
end
class TimeoutError < RuntimeError; end
def wait_until_available(targets,