$TESTING ||= false
$TRACE = Rake.application.options.trace
$-w = true if $TRACE

def export receiver, *methods
  methods.each do |method|
    eval "def #{method} *args, █ #{receiver}.#{method}(*args, &block);end"
  end
end

export "Thread.current[:task]", :get, :put, :rsync, :run, :sudo, :target_host
export "Rake::RemoteTask",      :host, :remote_task, :role, :set

# Rake::RemoteTask is a subclass of Rake::Task that adds
# remote_actions that execute in parallel on multiple hosts via ssh.
class Rake::RemoteTask < Rake::Task
  # Base error class for all  errors.
  class Error < RuntimeError; end

  # Raised when you have incorrect configuration.
  class ConfigurationError < Error; end

  # Raised when a remote command fails.
  class CommandFailedError < Error; end

  # Raised when an environment variable hasn't been set.
  class FetchError < Error; end
  
  @@current_roles = []

  include Open4

  # Options for execution of this task.
  attr_accessor :options

  # The host this task is running on during execution.
  attr_accessor :target_host

  # An Array of Actions this host will perform during execution. Use
  # enhance to add new actions to a task.
  attr_reader :remote_actions

  def self.current_roles
    @@current_roles
  end

  # Create a new task named +task_name+ attached to Rake::Application +app+.
  def initialize(task_name, app)
    super
    @remote_actions = []
  end

  # Add a local action to this task. This calls Rake::Task#enhance.
  alias_method :original_enhance, :enhance

  # Add remote action +block+ to this task with dependencies +deps+. See
  # Rake::Task#enhance.
  def enhance(deps=nil, &block)
    original_enhance(deps) # can't use super because block passed regardless.
    @remote_actions << Action.new(self, block) if block_given?
    self
  end

  # Execute this action. Local actions will be performed first, then remote
  # actions will be performed in parallel on each host configured for this
  # RemoteTask.
  def execute(args = nil)
    raise(Rake::RemoteTask::ConfigurationError,
          "No target hosts specified on task #{self.name} for roles #{options[:roles].inspect}") if
      ! defined_target_hosts?

    super args

    @remote_actions.each { |act| act.execute(target_hosts, self, args) }
  end
  
  # Pulls +files+ from the +target_host+ using rsync to +local_directory+.
  def get(local_directory, *files)
    rsync(files.map { |f| "#{target_host}:#{f}" }, local_directory)
  end
  
  # Copys a (usually generated) file to +remote_path+. Contents of block
  # are copied to +remote_path+ and you may specify an optional
  # base_name for the tempfile (aids in debugging).
  def put(remote_path, base_name = File.basename(remote_path))
    require 'tempfile'
    Tempfile.open(base_name) do |fp|
      fp.puts yield
      fp.flush
      rsync(fp.path, "#{target_host}:#{remote_path}")
    end
  end
  
  # Use rsync to send +local+ to +remote+.
  def rsync(local, remote)
    cmd = [rsync_cmd, rsync_flags, local, remote].flatten.compact
    cmdstr = cmd.join(' ')

    warn cmdstr if $TRACE

    success = system(*cmd)

    unless success then
      raise Rake::RemoteTask::CommandFailedError, "execution failed: #{cmdstr}"
    end
  end

  # Use ssh to execute +command+ on target_host. If +command+ uses sudo, the
  # sudo password will be prompted for then saved for subsequent sudo commands.
  def run(command)
    cmd = [ssh_cmd, ssh_flags, target_host, command].flatten
    result = []

    trace = [ssh_cmd, ssh_flags, target_host, "'#{command}'"].flatten.join(' ')
    warn trace if $TRACE

    pid, inn, out, err = popen4(*cmd)

    inn.sync   = true
    streams    = [out, err]
    out_stream = {
      out => $stdout,
      err => $stderr,
    }

    # Handle process termination ourselves
    status = nil
    Thread.start do
      status = Process.waitpid2(pid).last
    end

    until streams.empty? do
      # don't busy loop
      selected, = select streams, nil, nil, 0.1

      next if selected.nil? or selected.empty?

      selected.each do |stream|
        if stream.eof? then
          streams.delete stream if status # we've quit, so no more writing
          next
        end

        data = stream.readpartial(1024)
        out_stream[stream].write data

        if stream == err and data =~ sudo_prompt then
          inn.puts sudo_password
          data << "\n"
          $stderr.write "\n"
        end

        result << data
      end
    end

    unless status.success? then
      raise(Rake::RemoteTask::CommandFailedError,
            "execution failed with status #{status.exitstatus}: #{cmd.join ' '}")
    end

    result.join
  ensure
    inn.close rescue nil
    out.close rescue nil
    err.close rescue nil
  end

  # Returns an Array with every host configured.
  def self.all_hosts
    hosts_for(roles.keys)
  end

  # The default environment values. Used for resetting (mostly for
  # tests).
  def self.default_env
    @@default_env
  end

  def self.per_thread
    @@per_thread
  end

  # The remote task environment.
  def self.env
    @@env
  end

  # Fetches environment variable +name+ from the environment using
  # default +default+.
  def self.fetch(name, default = nil)
    name = name.to_s if Symbol === name
    if @@env.has_key? name then
      protect_env(name) do
        v = @@env[name]
        v = @@env[name] = v.call if Proc === v unless per_thread[name]
        v = v.call if Proc === v
        v
      end
    elsif default || default == false
      v = @@env[name] = default
    else
      raise Rake::RemoteTask::FetchError
    end
  end
  
  # Fetches a set of options and turns it into a hash to be passed to
  # the application specific delegate tasks.
  def self.get_options_hash(*options)
    options_hash = {}
    options.each do |option|
      option_value = Rake::RemoteTask.fetch(option, false)
      options_hash[option] = option_value if option_value
    end
    options_hash
  end

  # Add host +host_name+ that belongs to +roles+. Extra arguments may
  # be specified for the host as a hash as the last argument.
  #
  # host is the inversion of role:
  #
  #   host 'db1.example.com', :db, :master_db
  #
  # Is equivalent to:
  #
  #   role :db, 'db1.example.com'
  #   role :master_db, 'db1.example.com'
  def self.host(host_name, *roles)
    opts = Hash === roles.last ? roles.pop : {}

    roles.each do |role_name|
      role role_name, host_name, opts.dup
    end
  end

  # Returns an Array of all hosts in +roles+.
  def self.hosts_for(*roles)
    roles.flatten.map { |r|
      self.roles[r].keys
    }.flatten.uniq.sort
  end
  
  def self.mandatory(name, desc) # :nodoc:
    self.set(name) do
      raise(Rake::RemoteTask::ConfigurationError,
            "Please specify the #{desc} via the #{name.inspect} variable")
    end
  end

  # Ensures exclusive access to +name+.
  def self.protect_env(name) # :nodoc:
    @@env_locks[name].synchronize do
      yield
    end
  end

  # Adds a remote task named +name+ with options +options+ that will
  # execute +block+.
  def self.remote_task(name, *args, &block)
    options = (Hash === args.last) ? args.pop : {}
    t = Rake::RemoteTask.define_task(name, *args, &block)
    options[:roles] = Array options[:roles]
    options[:roles] |= @@current_roles
    t.options = options
    t
  end

  # Ensures +name+ does not conflict with an existing method.
  def self.reserved_name?(name) # :nodoc:
    !@@env.has_key?(name.to_s) && self.respond_to?(name)
  end

  # Resets vlad, restoring all roles, tasks and environment variables
  # to the defaults.
  def self.reset
    @@def_role_hash = {}                # official default role value
    @@env           = {}
    @@tasks         = {}
    @@roles         = Hash.new { |h,k| h[k] = @@def_role_hash }
    @@env_locks     = Hash.new { |h,k| h[k] = Mutex.new }

    @@default_env.each do |k,v|
      case v
      when Symbol, Fixnum, nil, true, false, 42 then # ummmm... yeah.
        @@env[k] = v
      else
        @@env[k] = v.dup
      end
    end
  end

  # Adds role +role_name+ with +host+ and +args+ for that host.
  def self.role(role_name, host = nil, args = {})
    if block_given? then
      raise ArgumentError, 'host not allowed with block' unless host.nil?
 
      begin
        current_roles << role_name
        yield
      ensure
        current_roles.delete role_name
      end
    else
      raise ArgumentError, 'host required' if host.nil?
 
      [*host].each do |hst|
        raise ArgumentError, "invalid host: #{hst}" if hst.nil? or hst.empty?
      end
      @@roles[role_name] = {} if @@def_role_hash.eql? @@roles[role_name]
      @@roles[role_name][host] = args
    end
  end

  # The configured roles.
  def self.roles
    host domain, :app, :web, :db if @@roles.empty?

    @@roles
  end

  # Set environment variable +name+ to +value+ or +default_block+.
  #
  # If +default_block+ is defined, the block will be executed the
  # first time the variable is fetched, and the value will be used for
  # every subsequent fetch.
  def self.set(name, value = nil, &default_block)
    raise ArgumentError, "cannot provide both a value and a block" if
      value and default_block unless
      value == :per_thread
    raise ArgumentError, "cannot set reserved name: '#{name}'" if
      Rake::RemoteTask.reserved_name?(name) unless $TESTING

    name = name.to_s

    Rake::RemoteTask.per_thread[name] = true if
      default_block && value == :per_thread

    Rake::RemoteTask.default_env[name] = Rake::RemoteTask.env[name] =
      default_block || value

    Object.send :define_method, name do
      Rake::RemoteTask.fetch name
    end
  end

  # Sets all the default values. Should only be called once. Use reset
  # if you need to restore values.
  def self.set_defaults
    @@default_env ||= {}
    @@per_thread  ||= {}
    self.reset

    mandatory :repository, "repository path"
    mandatory :deploy_to,  "deploy path"
    mandatory :domain,     "server domain"

    simple_set(:app_env,            "production",
               :deploy_timestamped, true,
               :keep_releases,      5,
               :rake_cmd,           "rake",
               :rsync_cmd,          "rsync",
               :rsync_flags,        ['-azP', '--delete'],
               :ssh_cmd,            "ssh",
               :ssh_flags,          [],
               :sudo_cmd,           "sudo",
               :sudo_flags,         ['-p Password:'],
               :sudo_prompt,        /^Password:/)   
    
    set(:latest_release)     { deploy_timestamped ? File.join(releases_path, releases[-1]) : releases_path }
    set(:previous_release) do
      if deploy_timestamped
        if releases[-2]
          File.join(releases_path, releases[-2])
        else
          nil
        end
      else
        releases_path
      end
    end
    set(:release_name)       { deploy_timestamped ? Time.now.utc.strftime("%Y%m%d%H%M%S") : nil }
    set(:release_path)       { release_name ? File.join(releases_path, release_name) : releases_path }
    set(:releases)           { task.run("ls -x #{releases_path}").split.sort }

    set_path :current_path,        "current"
    set_path :releases_path,       "releases"
    set_path :scm_path,            "scm"
    set_path :shared_path,         "shared"
    set_path :shared_config_path,  File.join("shared", "config")
    
    set(:sudo_password) do
      state = `stty -g`

      raise Rake::RemoteTask::Error, "stty(1) not found" unless $?.success?

      begin
        system "stty -echo"
        $stdout.print "sudo password: "
        $stdout.flush
        sudo_password = $stdin.gets
        $stdout.puts
      ensure
        system "stty #{state}"
      end
      sudo_password
    end
  end

  def self.set_path(name, subdir) # :nodoc:
    set(name) { File.join(deploy_to, subdir) }
  end

  def self.simple_set(*args) # :nodoc:
    args = Hash[*args]
    args.each do |k, v|
      set k, v
    end
  end

  # The Rake::RemoteTask executing in this Thread.
  def self.task
    Thread.current[:task]
  end

  # The configured Rake::RemoteTasks.
  def self.tasks
    @@tasks
  end

  # Execute +command+ under sudo using run.
  def sudo(command)
    run [sudo_cmd, sudo_flags, command].flatten.compact.join(" ")
  end

  # The hosts this task will execute on. The hosts are determined from
  # the role this task belongs to.
  #
  # The target hosts may be overridden by providing a comma-separated
  # list of commands to the HOSTS environment variable:
  #
  #   rake my_task HOSTS=app1.example.com,app2.example.com
  def target_hosts
    if hosts = ENV["HOSTS"] then
      hosts.strip.gsub(/\s+/, '').split(",")
    else
      roles = Array options[:roles]

      if roles.empty? then
        Rake::RemoteTask.all_hosts
      else
        Rake::RemoteTask.hosts_for roles
      end
    end
  end

  # Similar to target_hosts, but returns true if user defined any hosts, even
  # an empty list.
  def defined_target_hosts?
    return true if ENV["HOSTS"]
    roles = Array options[:roles]
    return true if roles.empty?
    # borrowed from hosts_for:
    roles.flatten.each { |r|
      return true unless @@def_role_hash.eql? Rake::RemoteTask.roles[r] 
    }
    return false
  end
  
  # Action is used to run a task's remote_actions in parallel on each
  # of its hosts. Actions are created automatically in
  # Rake::RemoteTask#enhance.
  class Action

    # The task this action is attached to.
    attr_reader :task

    # The block this action will execute.
    attr_reader :block

    # An Array of threads, one for each host this action executes on.
    attr_reader :workers

    # Creates a new Action that will run +block+ for +task+.
    def initialize(task, block)
      @task  = task
      @block = block
      @workers = ThreadGroup.new
    end

    def ==(other) # :nodoc:
      return false unless Action === other
      block == other.block && task == other.task
    end

    # Execute this action on +hosts+ in parallel. Returns when block
    # has completed for each host.
    def execute(hosts, task, args)
      hosts.each do |host|
        t = task.clone
        t.target_host = host
        thread = Thread.new(t) do |task|
          Thread.current.abort_on_exception = true
          Thread.current[:task] = task
          case block.arity
          when 1
            block.call task
          else
            block.call task, args
          end
          Thread.current[:task] = nil
        end
        @workers.add thread
      end
      @workers.list.each { |thr| thr.join }
      
    end
  end
end

Rake::RemoteTask.set_defaults