# frozen_string_literal: true

# Used for $ERROR_INFO. This *must* be capitalized!
require 'English'
require 'json'
require 'logging'
require 'pathname'
require 'set'
require 'bolt/analytics'
require 'bolt/config'
require 'bolt/fiber_executor'
require 'bolt/puppetdb'
require 'bolt/result'
require 'bolt/result_set'
# Load transports
require 'bolt/transport/docker'
require 'bolt/transport/local'
require 'bolt/transport/lxd'
require 'bolt/transport/orch'
require 'bolt/transport/podman'
require 'bolt/transport/remote'
require 'bolt/transport/ssh'
require 'bolt/transport/winrm'

module Bolt
  TRANSPORTS = {
    docker: Bolt::Transport::Docker,
    local: Bolt::Transport::Local,
    lxd: Bolt::Transport::LXD,
    pcp: Bolt::Transport::Orch,
    podman: Bolt::Transport::Podman,
    remote: Bolt::Transport::Remote,
    ssh: Bolt::Transport::SSH,
    winrm: Bolt::Transport::WinRM
  }.freeze

  class Executor
    attr_reader :noop, :transports, :future
    attr_accessor :run_as

    def initialize(concurrency = 1,
                   analytics = Bolt::Analytics::NoopClient.new,
                   noop = false,
                   modified_concurrency = false,
                   future = {})
      # lazy-load expensive gem code
      require 'concurrent'
      @analytics = analytics
      @logger = Bolt::Logger.logger(self)

      @transports = Bolt::TRANSPORTS.each_with_object({}) do |(key, val), coll|
        coll[key.to_s] = if key == :remote
                           Concurrent::Delay.new do
                             val.new(self)
                           end
                         else
                           Concurrent::Delay.new do
                             val.new
                           end
                         end
      end
      @reported_transports = Set.new
      @subscribers = {}
      @publisher = Concurrent::SingleThreadExecutor.new
      @publisher.post { Thread.current[:name] = 'event-publisher' }

      @noop = noop
      @run_as = nil
      @future = future
      @pool = if concurrency > 0
                Concurrent::ThreadPoolExecutor.new(name: 'exec', max_threads: concurrency)
              else
                Concurrent.global_immediate_executor
              end
      @logger.debug { "Started with #{concurrency} max thread(s)" }

      @concurrency = concurrency
      @warn_concurrency = modified_concurrency
      @fiber_executor = Bolt::FiberExecutor.new
    end

    def transport(transport)
      impl = @transports[transport || 'ssh']
      raise(Bolt::UnknownTransportError, transport) unless impl
      # If there was an error creating the transport, ensure it gets thrown
      impl.no_error!
      impl.value
    end

    def subscribe(subscriber, types = nil)
      @subscribers[subscriber] = types
      self
    end

    def unsubscribe(subscriber, types = nil)
      if types.nil? || types.sort == @subscribers[subscriber]&.sort
        @subscribers.delete(subscriber)
      elsif @subscribers[subscriber].is_a?(Array)
        @subscribers[subscriber] = @subscribers[subscriber] - types
      end
    end

    def publish_event(event)
      @subscribers.each do |subscriber, types|
        # If types isn't set or if the subscriber is subscribed to
        # that type of event, publish the event
        next unless types.nil? || types.include?(event[:type])
        @publisher.post(subscriber) do |sub|
          # Wait for user to input to prompt before printing anything
          sleep(0.1) while @prompting
          sub.handle_event(event)
        end
      end
    end

    def shutdown
      @publisher.shutdown
      @publisher.wait_for_termination
    end

    # Starts executing the given block on a list of nodes in parallel, one thread per "batch".
    #
    # This is the main driver of execution on a list of targets. It first
    # groups targets by transport, then divides each group into batches as
    # defined by the transport. Yields each batch, along with the corresponding
    # transport, to the block in turn and returns an array of result promises.
    def queue_execute(targets)
      if @warn_concurrency && targets.length > @concurrency
        @warn_concurrency = false
        msg = "The ulimit is low, which might cause file limit issues. Default concurrency has been set to "\
              "'#{@concurrency}' to mitigate those issues, which might cause Bolt to run slow. "\
              "Disable this warning by configuring ulimit using 'ulimit -n <limit>' in your shell "\
              "configuration, or by configuring Bolt's concurrency. "\
              "See https://puppet.com/docs/bolt/latest/bolt_known_issues.html for details."
        Bolt::Logger.warn("low_ulimit", msg)
      end

      targets.group_by(&:transport).flat_map do |protocol, protocol_targets|
        transport = transport(protocol)
        report_transport(transport, protocol_targets.count)
        transport.batches(protocol_targets).flat_map do |batch|
          batch_promises = Array(batch).each_with_object({}) do |target, h|
            h[target] = Concurrent::Promise.new(executor: :immediate)
          end
          # Pass this argument through to avoid retaining a reference to a
          # local variable that will change on the next iteration of the loop.
          @pool.post(batch_promises) do |result_promises|
            Thread.current[:name] ||= Thread.current.name
            results = yield transport, batch
            Array(results).each do |result|
              result_promises[result.target].set(result)
            end
          # NotImplementedError can be thrown if the transport is not implemented improperly
          rescue StandardError, NotImplementedError => e
            result_promises.each do |target, promise|
              # If an exception happens while running, the result won't be logged
              # by the CLI. Log a warning, as this is probably a problem with the transport.
              # If batch_* commands are used from the Base transport, then exceptions
              # normally shouldn't reach here.
              @logger.warn(e)
              promise.set(Bolt::Result.from_exception(target, e))
            end
          ensure
            # Make absolutely sure every promise gets a result to avoid a
            # deadlock. Use whatever exception is causing this block to
            # execute, or generate one if we somehow got here without an
            # exception and some promise is still missing a result.
            result_promises.each do |target, promise|
              next if promise.fulfilled?
              error = $ERROR_INFO || Bolt::Error.new("No result was returned for #{target.uri}",
                                                     "puppetlabs.bolt/missing-result-error")
              promise.set(Bolt::Result.from_exception(target, error))
            end
          end
          batch_promises.values
        end
      end
    end

    # Create a ResultSet from the results of all promises.
    def await_results(promises)
      ResultSet.new(promises.map(&:value))
    end

    # Execute the given block on a list of nodes in parallel, one thread per "batch".
    #
    # This is the main driver of execution on a list of targets. It first
    # groups targets by transport, then divides each group into batches as
    # defined by the transport. Each batch, along with the corresponding
    # transport, is yielded to the block in turn and the results all collected
    # into a single ResultSet.
    def batch_execute(targets, &block)
      promises = queue_execute(targets, &block)
      await_results(promises)
    end

    def log_action(description, targets)
      publish_event(type: :step_start, description: description, targets: targets)

      start_time = Time.now
      results = yield
      duration = Time.now - start_time

      publish_event(type: :step_finish, description: description, result: results, duration: duration)

      results
    end

    def log_plan(plan_name)
      publish_event(type: :plan_start, plan: plan_name)
      start_time = Time.now

      results = nil
      begin
        results = yield
      ensure
        duration = Time.now - start_time
        publish_event(type: :plan_finish, plan: plan_name, duration: duration)
      end

      results
    end

    private def report_transport(transport, count)
      name = transport.class.name.split('::').last.downcase
      unless @reported_transports.include?(name)
        @analytics&.event('Transport', 'initialize', label: name, value: count)
      end
      @reported_transports.add(name)
    end

    def report_function_call(function)
      @analytics&.event('Plan', 'call_function', label: function)
    end

    def report_bundled_content(mode, name)
      @analytics.report_bundled_content(mode, name)
    end

    def report_file_source(plan_function, source)
      label = Pathname.new(source).absolute? ? 'absolute' : 'module'
      @analytics&.event('Plan', plan_function, label: label)
    end

    def report_noop_mode(noop)
      @analytics&.event('Task', 'noop', label: (!!noop).to_s)
    end

    def report_apply(statement_count, resource_counts)
      data = { statement_count: statement_count }

      unless resource_counts.empty?
        sum = resource_counts.inject(0) { |accum, i| accum + i }
        # Intentionally rounded to an integer. High precision isn't useful.
        data[:resource_mean] = sum / resource_counts.length
      end

      @analytics&.event('Apply', 'ast', **data)
    end

    def report_yaml_plan(plan)
      steps = plan.steps.count
      return_type = case plan.return
                    when Bolt::PAL::YamlPlan::EvaluableString
                      'expression'
                    when nil
                      nil
                    else
                      'value'
                    end

      @analytics&.event('Plan', 'yaml', plan_steps: steps, return_type: return_type)
    rescue StandardError => e
      @logger.trace { "Failed to submit analytics event: #{e.message}" }
    end

    def with_node_logging(description, batch, log_level = :info)
      @logger.send(log_level, "#{description} on #{batch.map(&:safe_name)}")
      publish_event(type: :start_spin)
      result = yield
      publish_event(type: :stop_spin)
      @logger.send(log_level, result.to_json)
      result
    end

    def run_command(targets, command, options = {}, position = [])
      description = options.fetch(:description, "command '#{command}'")
      log_action(description, targets) do
        options[:run_as] = run_as if run_as && !options.key?(:run_as)

        batch_execute(targets) do |transport, batch|
          with_node_logging("Running command '#{command}'", batch) do
            transport.batch_command(batch, command, options, position, &method(:publish_event))
          end
        end
      end
    end

    def run_script(targets, script, arguments, options = {}, position = [])
      description = options.fetch(:description, "script #{script}")
      log_action(description, targets) do
        options[:run_as] = run_as if run_as && !options.key?(:run_as)

        batch_execute(targets) do |transport, batch|
          with_node_logging("Running script #{script} with '#{arguments.to_json}'", batch) do
            transport.batch_script(batch, script, arguments, options, position, &method(:publish_event))
          end
        end
      end
    end

    def run_task(targets, task, arguments, options = {}, position = [], log_level = :info)
      description = options.fetch(:description, "task #{task.name}")
      log_action(description, targets) do
        options[:run_as] = run_as if run_as && !options.key?(:run_as)
        arguments['_task'] = task.name

        batch_execute(targets) do |transport, batch|
          with_node_logging("Running task #{task.name} with '#{arguments.to_json}'", batch, log_level) do
            transport.batch_task(batch, task, arguments, options, position, &method(:publish_event))
          end
        end
      end
    end

    def run_task_with(target_mapping, task, options = {}, position = [])
      targets = target_mapping.keys
      description = options.fetch(:description, "task #{task.name}")

      log_action(description, targets) do
        options[:run_as] = run_as if run_as && !options.key?(:run_as)
        target_mapping.each_value { |arguments| arguments['_task'] = task.name }

        batch_execute(targets) do |transport, batch|
          with_node_logging("Running task #{task.name}'", batch) do
            transport.batch_task_with(batch, task, target_mapping, options, position, &method(:publish_event))
          end
        end
      end
    end

    def upload_file(targets, source, destination, options = {}, position = [])
      description = options.fetch(:description, "file upload from #{source} to #{destination}")
      log_action(description, targets) do
        options[:run_as] = run_as if run_as && !options.key?(:run_as)

        batch_execute(targets) do |transport, batch|
          with_node_logging("Uploading file #{source} to #{destination}", batch) do
            transport.batch_upload(batch, source, destination, options, position, &method(:publish_event))
          end
        end
      end
    end

    def download_file(targets, source, destination, options = {}, position = [])
      description = options.fetch(:description, "file download from #{source} to #{destination}")

      begin
        FileUtils.mkdir_p(destination)
      rescue Errno::EEXIST => e
        message = "#{e.message}; unable to create destination directory #{destination}"
        raise Bolt::Error.new(message, 'bolt/file-exist-error')
      end

      log_action(description, targets) do
        options[:run_as] = run_as if run_as && !options.key?(:run_as)

        batch_execute(targets) do |transport, batch|
          with_node_logging("Downloading file #{source} to #{destination}", batch) do
            transport.batch_download(batch, source, destination, options, position, &method(:publish_event))
          end
        end
      end
    end

    def run_plan(scope, plan, params)
      plan.call_by_name_with_scope(scope, params, true)
    end

    # Call into FiberExecutor to avoid this class getting
    # overloaded while also minimizing the Puppet lookups needed from plan
    # functions
    #
    def create_future(scope: nil, name: nil, &block)
      @fiber_executor.create_future(scope: scope, name: name, &block)
    end

    def plan_complete?
      @fiber_executor.plan_complete?
    end

    def round_robin
      @fiber_executor.round_robin
    end

    def in_parallel?
      @fiber_executor.in_parallel?
    end

    def wait(futures, **opts)
      @fiber_executor.wait(futures, **opts)
    end

    def plan_futures
      @fiber_executor.plan_futures
    end

    # Execute a plan function concurrently. This function accepts the executor
    # function to be run and the parameters to pass to it, and returns the
    # result of running the executor function.
    #
    def run_in_thread
      require 'concurrent'
      require 'fiber'
      future = Concurrent::Future.execute do
        yield
      end

      # Used to track how often we resume the same executor function
      still_running = 0
      # While the thread is still running
      while future.incomplete?
        # If the Fiber gets resumed, increment the resume tracker. This means
        # the tracker starts at 1 since it needs to increment before yielding,
        # since it can't yield then increment.
        still_running += 1
        # If the Fiber has been resumed before, still_running will be 2 or
        # more. Yield different values for when the same Fiber is resumed
        # multiple times and when it's resumed the first time in order to know
        # if progress was made in the plan.
        Fiber.yield(still_running < 2 ? :something_happened : :returned_immediately)
      end

      # Once the thread completes, return the result.
      future.value || future.reason
    end

    class TimeoutError < RuntimeError; end

    def wait_until_available(targets,
                             description: 'wait until available',
                             wait_time: 120,
                             retry_interval: 1)
      log_action(description, targets) do
        batch_execute(targets) do |transport, batch|
          with_node_logging('Waiting until available', batch) do
            wait_until(wait_time, retry_interval) { transport.batch_connected?(batch) }
            batch.map { |target| Result.new(target, action: 'wait_until_available', object: description) }
          rescue TimeoutError => e
            available, unavailable = batch.partition { |target| transport.batch_connected?([target]) }
            (
              available.map { |target| Result.new(target, action: 'wait_until_available', object: description) } +
              unavailable.map { |target| Result.from_exception(target, e, action: 'wait_until_available') }
            )
          end
        end
      end
    end

    # Used to simplify unit testing, to avoid having to mock other calls to Time.now.
    private def wait_now
      Time.now
    end

    private def wait_until(timeout, retry_interval)
      start = wait_now
      until yield
        raise(TimeoutError, 'Timed out waiting for target') if (wait_now - start).to_i >= timeout
        sleep(retry_interval)
      end
    end

    def prompt(prompt, options)
      unless $stdin.tty?
        return options[:default] if options[:default]
        raise Bolt::Error.new('STDIN is not a tty, unable to prompt', 'bolt/no-tty-error')
      end

      @prompting = true

      if options[:default] && !options[:sensitive]
        $stderr.print("#{prompt} [#{options[:default]}]: ")
      else
        $stderr.print("#{prompt}: ")
      end

      value = if options[:sensitive]
                $stdin.noecho(&:gets).to_s.chomp
              else
                $stdin.gets.to_s.chomp
              end

      @prompting = false

      $stderr.puts if options[:sensitive]

      value = options[:default] if value.empty?
      value
    end

    # Plan context doesn't make sense for most transports but it is tightly
    # coupled with the orchestrator transport since the transport behaves
    # differently when a plan is running. In order to limit how much this
    # pollutes the transport API we only handle the orchestrator transport here.
    # Since we call this function without resolving targets this will result
    # in the orchestrator transport always being initialized during plan runs.
    # For now that's ok.
    #
    # In the future if other transports need this or if we want a plan stack
    # we'll need to refactor.
    def start_plan(plan_context)
      transport('pcp').plan_context = plan_context
    end

    def finish_plan(plan_result)
      transport('pcp').finish_plan(plan_result)
    end

    def without_default_logging
      publish_event(type: :disable_default_output)
      yield
    ensure
      publish_event(type: :enable_default_output)
    end
  end
end