module R10K
  module ContentSynchronizer

    def self.serial_accept(modules, visitor, loader)
      visitor.visit(:puppetfile, loader) do
        serial_sync(modules)
      end
    end

    def self.serial_sync(modules)
      modules.each do |mod|
        mod.sync
      end
    end

    def self.concurrent_accept(modules, visitor, loader, pool_size, logger)
      mods_queue = modules_visit_queue(modules, visitor, loader)
      sync_queue(mods_queue, pool_size, logger)
    end

    def self.concurrent_sync(modules, pool_size, logger)
      mods_queue = modules_sync_queue(modules)
      sync_queue(mods_queue, pool_size, logger)
    end

    def self.sync_queue(mods_queue, pool_size, logger)
      logger.debug _("Updating modules with %{pool_size} threads") % {pool_size: pool_size}
      thread_pool = pool_size.times.map { sync_thread(mods_queue, logger) }
      thread_exception = nil

      # If any threads raise an exception the deployment is considered a failure.
      # In that event clear the queue, wait for other threads to finish their
      # current work, then re-raise the first exception caught.
      begin
        thread_pool.each(&:join)
      rescue => e
        logger.error _("Error during concurrent deploy of a module: %{message}") % {message: e.message}
        mods_queue.clear
        thread_exception ||= e
        retry
      ensure
        raise thread_exception unless thread_exception.nil?
      end
    end

    def self.modules_visit_queue(modules, visitor, loader)
      Queue.new.tap do |queue|
        visitor.visit(:puppetfile, loader) do
          enqueue_modules(queue, modules)
        end
      end
    end

    def self.modules_sync_queue(modules)
      Queue.new.tap do |queue|
        enqueue_modules(queue, modules)
      end
    end

    def self.enqueue_modules(queue, modules)
      modules_by_cachedir = modules.group_by { |mod| mod.cachedir }
      modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || []

      modules_without_vcs_cachedir.each {|mod| queue << Array(mod) }
      modules_by_cachedir.values.each {|mods| queue << mods }
    end

    def self.sync_thread(mods_queue, logger)
      Thread.new do
        begin
          while mods = mods_queue.pop(true) do
            mods.each { |mod| mod.sync }
          end
        rescue ThreadError => e
          logger.debug _("Module thread %{id} exiting: %{message}") % {message: e.message, id: Thread.current.object_id}
          Thread.exit
        rescue => e
          Thread.main.raise(e)
        end
      end
    end
  end
end