module Filbunke
  class Client

    attr_reader :repository

    UPDATES_ACTION = 'updates'
    FILES_ACTION = 'files'
    LAST_CHECKPOINT_ACTION = 'last_checkpoint'
    TOUCH_ACTION = 'touch'
    URL_KEY = 'url'
    FROM_CHECKPOINT_KEY = 'from_checkpoint'
    HASH_KEY = 'hash'
    URI_UNSAFE_CHARACTERS = '/[^.:\/\w-]/'


    def initialize(repository, logger, callbacks = [], failed_request_log_file_name = nil)
      @repository = repository
      @logger = logger
      @callbacks = callbacks
      @failed_request_log_file_name = failed_request_log_file_name
      @hydra = Typhoeus::Hydra.new(:max_concurrency => @repository.hydra_concurrency)

      @logger.info "initialized client for repository '#{@repository.name}';  #{@repository.inspect}"
    end

    def with_updated_files(last_checkpoint)
      updates = get_updated_file_list(last_checkpoint)
      updated_files = updates["files"] || []
      failure = false
      
      new_checkpoint = updates["checkpoint"]
      
      @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0

      @async_requests = []
      
      callbacks_on_update = []
      callbacks_on_no_change = []
      callbacks_on_delete = []

      updated_files.each do |raw_file|
        file = File.new(raw_file)
        local_file_path = ::File.join(@repository.local_path, file.path)
        if file.state == "DELETED" then
          delete_file!(local_file_path)
          callbacks_on_delete << OpenStruct.new({ :file => file, :local_file_path => local_file_path })
        else
          if file_needs_update?(file, local_file_path)
            if update_file!(file, local_file_path) then
              yield file
              callbacks_on_update << OpenStruct.new({ :file => file, :local_file_path => local_file_path })
            else
              @logger.error "Unable to get file #{file.url} ==> #{file.path}!"
              failure = true
            end
          else
            @logger.debug "File exists with correct hash: #{local_file_path}"
            callbacks_on_no_change << OpenStruct.new({:file => file, :local_file_path => local_file_path})
          end
        end
      end
      @hydra.run

      pfailure = failure || @async_requests.any? do |request| 
        @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200
        request.response.nil? || request.response.code != 200
      end

      if pfailure == false
        @logger.info "Done fetching files for #{@repository.name}, processing callbacks..."
        begin
          
          run_callbacks_delete(callbacks_on_delete)
          
          run_callbacks(callbacks_on_update)
          
          run_callbacks_no_change(callbacks_on_no_change)

          new_checkpoint || last_checkpoint
        rescue RuntimeError, SystemCallError, StandardError => e
          msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t")
          @logger.error "FAILED to update files for #{@repository.name}  last_checkpoint = #{last_checkpoint}; #{msg}"
          last_checkpoint
        end
      else
        @logger.error "FAILED to update files for #{@repository.name}  last_checkpoint = #{last_checkpoint}"
        last_checkpoint
      end
    end
    
    def update_files!(last_checkpoint)
      with_updated_files(last_checkpoint) {}
    end

    def register_updated_file!(path, url, hash = nil)
      register_path = "/#{FILES_ACTION}/#{@repository.name}/#{path}?#{URL_KEY}=#{url}"
      register_path += "&#{HASH_KEY}=#{hash}" if hash
      begin
        register_http = Net::HTTP.new(@repository.host, @repository.port)
        register_http.start do |http|
          request = Net::HTTP::Put.new(register_path)
          response = http.request(request)
          if response.code.to_i != 204
            raise "Failed to register updated file: #{path}"
          end
        end
      rescue Exception => e
        log_failed_request(%Q{curl -XPUT "http://#{@repository.host}:#{@repository.port}#{register_path}"}, e)
        raise e
      end
    end

    def register_deleted_file!(path)
      register_path = "/#{FILES_ACTION}/#{@repository.name}/#{path}"
      begin
        register_http = Net::HTTP.new(@repository.host, @repository.port)
        register_http.start do |http|
          request = Net::HTTP::Delete.new(register_path)
          response = http.request(request)
          if response.code.to_i != 204
            raise "Failed to register deleted file: #{path}"
          end
        end
      rescue Exception => e
        log_failed_request(%Q{curl -XDELETE "http://#{@repository.host}:#{@repository.port}#{register_path}"}, e)
        raise e
      end
    end

    def touch_repository!
      touch_http = Net::HTTP.new(@repository.host, @repository.port)
      touch_http.start do |http|
        touch_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{TOUCH_ACTION}"
        request = Net::HTTP::Put.new(touch_path)
        response = http.request(request)
        if response.code.to_i != 204
          raise "Failed to touch repository: #{@repository.name}"
        end
      end
    end

    def last_checkpoint
     last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port)
      last_checkpoint_http.start do |http|
        last_checkpoint_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{LAST_CHECKPOINT_ACTION}"
        request = Net::HTTP::Get.new(last_checkpoint_path)
        response = http.request(request)
        if response.code.to_i != 200
          raise "Failed to get last checkpoint for repository: #{@repository.name}"
        end
        return response.body.chomp.to_i
      end
    end
    
    private

    def log_failed_request(failed_request_command, e)
      return unless @failed_request_log_file_name
      ::File.open(@failed_request_log_file_name, 'a+') do |f|
        f.puts("# #{Time.now}: #{e.message if e}")
        f.puts(failed_request_command)
      end
    end

    def update_file!(file, local_file_path)
        
      if file.url =~ /^http:\/\//
        update_http_file!(file, local_file_path)
      elsif (file.url =~ /^hdfs:\/\//)
        success = update_hdfs_file!(file, local_file_path)
      else
        raise "Unsupported protocol for file: #{file.inspect}"
      end
    end

    def run_callbacks(files)
      @callbacks.each do |callback|
        callback.on_update_batch(files)
      end
    end

    def run_callbacks_no_change(files)
      @callbacks.each do |callback|
        callback.on_no_change_batch(files)
      end
    end

    def run_callbacks_delete(files)
      @callbacks.each do |callback|
        callback.on_delete_batch(files)
      end
    end

    def file_needs_update?(file, local_file_path)
      return true if file.hash.nil? || file.hash == ""

      return true unless ::File.exists?(local_file_path)

      local_hash = Digest::MD5.hexdigest(::File.read(local_file_path))
      local_hash != file.hash
    end

    def get_updated_file_list(last_checkpoint)
      begin
        updates_http = Net::HTTP.new(@repository.host, @repository.port)
        updates_http.read_timeout = 300 # default is 60 seconds
        updates_http.start do |http|
          updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}"
          begin
            @logger.info "Fetching updated file list from #{updates_path}"
            request = Net::HTTP::Get.new(updates_path)
            response = http.request(request)
            if response.code.to_i == 200
              JSON.parse(response.body)
            else
              @logger.error "Failed to download updates for #{@repository.name}, error code = #{response.code}"
              {}
            end
          rescue StandardError => e
            @logger.error "Error getting file list from http://#{@repository.host}:#{@repository.port}#{updates_path}: #{e.message}! Retrying later.."
            {}
          end
        end
      rescue StandardError => e
        @logger.error "Unable to create HTTP connection to #{@repository.host}:#{@repository.port} (#{e.message})!"
        return {}
      end
    end

    def update_http_file!(file, local_file_path)
      begin
        async_request = if @repository.user
          Typhoeus::Request.new(URI.encode(file.url, URI_UNSAFE_CHARACTERS), :follow_location => true, :username => @repository.user, :password => @repository.pass)
        else
          Typhoeus::Request.new(URI.encode(file.url, URI_UNSAFE_CHARACTERS), :follow_location => true)
        end
        async_request.on_complete do |response|
          success = false
          begin
            success = response.code.to_i == 200
            if success
              write_file!(local_file_path, response.body)
            else
              body_if_error = response.code >= 500 ? ", body = #{response.body}" : ""
              @logger.warn "Failed to update file #{file.url}, got status code = #{response.code}#{body_if_error}"
            end
          rescue SystemCallError, StandardError => e
            msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
            @logger.error "Failed to update file #{file.url}: #{msg}"
          end
          # return the async_request.handled_response value here
          success
        end
        @hydra.queue async_request
        @async_requests << async_request
      rescue StandardError => e
        msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
        @logger.error "Failed to update file #{file.url}: #{msg}"
        return false
      end

      return true
    end
    
    def update_hdfs_file!(file, local_file_path)
      begin
        ::FileUtils.mkdir_p(::File.dirname(local_file_path))
        ::FileUtils.rm_f("#{local_file_path}.tmp")
        url = file.url
        url.gsub!(/hdfs:\/\/([^\/]*)(.*)/, "hdfs://\\2")
        hdfs_cmd = "#{@repository.hadoop_binary} dfs -copyToLocal #{url} #{local_file_path}.tmp"
        #@logger.debug "Trying to update #{local_file_path} with '#{hdfs_cmd}'"
        
        pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd
        ignored, status = Process::waitpid2 pid
              
        if status.exitstatus == 0 then
          begin
            ::FileUtils.mv "#{local_file_path}.tmp", local_file_path
            return true
          rescue StandardError => e
            msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
            @logger.error "Failed to move hdfs file #{file.url}: #{msg}"
            return false
          end  
        else
          @logger.error "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}"
          return false
        end
      rescue SystemCallError, StandardError => e
        msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
        @logger.error "Failed to update hdfs file #{file.url}: #{msg}"
        return false
      end
    end

    def write_file!(file_path, contents)
      ::FileUtils.mkdir_p(::File.dirname(file_path))
      @logger.debug("Updating: #{file_path}")
      begin
        ::File.open("#{file_path}.tmp", 'w') do |file|
          file.write(contents)
          file.close
        end
        ::FileUtils.mv "#{file_path}.tmp", file_path
        return true
      rescue StandardError => e
        msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
        @logger.error "Failed to move file #{file_path}: #{msg}"
        return false
      end  
    end

    def delete_file!(file_path)
      if ::File.exists?(file_path) then
        @logger.debug("Deleting: #{file_path}")
        ::File.delete(file_path)
      end
    end

  end
end