package s3.website

import s3.website.ErrorReport.errorMessage
import s3.website.model._
import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client}
import com.amazonaws.services.s3.model._
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContextExecutor, Future}
import com.amazonaws.services.s3.model.StorageClass.ReducedRedundancy
import s3.website.ByteHelper.humanReadableByteCount
import scala.concurrent.duration.TimeUnit
import java.util.concurrent.TimeUnit.SECONDS
import s3.website.S3.SuccessfulUpload.humanizeUploadSpeed
import java.io.FileInputStream
import s3.website.model.Config.awsCredentials
import scala.util.Try

object S3 {

  def uploadRedirect(redirect: Redirect, a: Attempt = 1)
            (implicit config: Config, s3Settings: S3Setting, pushOptions: PushOptions, executor: ExecutionContextExecutor, logger: Logger) =
    uploadToS3(Right(redirect))

  def uploadFile(up: Upload, a: Attempt = 1)
                    (implicit config: Config, s3Settings: S3Setting, pushOptions: PushOptions, executor: ExecutionContextExecutor, logger: Logger) =
    uploadToS3(Left(up))

  def uploadToS3(source: Either[Upload, Redirect], a: Attempt = 1)
            (implicit config: Config, s3Settings: S3Setting, pushOptions: PushOptions, executor: ExecutionContextExecutor, logger: Logger):
  Future[Either[FailedUpload, SuccessfulUpload]] =
    Future {
      val putObjectRequest = toPutObjectRequest(source).get
      val uploadDuration =
        if (pushOptions.dryRun) None
        else Some(recordUploadDuration(putObjectRequest, s3Settings.s3Client(config) putObject putObjectRequest))
      val report = SuccessfulUpload(
        source.fold(_.s3Key, _.s3Key),
        source.fold(
          upload => Left(SuccessfulNewOrCreatedDetails(upload.uploadType, upload.uploadFile.get.length(), uploadDuration)),
          redirect  => Right(SuccessfulRedirectDetails(redirect.uploadType, redirect.redirectTarget))
        ),
        putObjectRequest
      )
      logger.info(report)
      Right(report)
    } recoverWith retry(a)(
      createFailureReport = error => FailedUpload(source.fold(_.s3Key, _.s3Key), error),
      retryAction  = newAttempt => this.uploadToS3(source, newAttempt)
    )

  def delete(s3Key: S3Key,  a: Attempt = 1)
            (implicit config: Config, s3Settings: S3Setting, pushOptions: PushOptions, executor: ExecutionContextExecutor, logger: Logger):
  Future[Either[FailedDelete, SuccessfulDelete]] =
    Future {
      if (!pushOptions.dryRun) s3Settings.s3Client(config) deleteObject(config.s3_bucket, s3Key)
      val report = SuccessfulDelete(s3Key)
      logger.info(report)
      Right(report)
    } recoverWith retry(a)(
      createFailureReport = error => FailedDelete(s3Key, error),
      retryAction  = newAttempt => this.delete(s3Key, newAttempt)
    )

  def toPutObjectRequest(source: Either[Upload, Redirect])(implicit config: Config): Try[PutObjectRequest] =
    source.fold(
      upload =>
        for {
          uploadFile <- upload.uploadFile
          contentType <- upload.contentType
        } yield {
          val md = new ObjectMetadata()
          md setContentLength uploadFile.length
          md setContentType contentType
          upload.encodingOnS3.map(_ => "gzip") foreach md.setContentEncoding
          upload.maxAge foreach { seconds =>
            md.setCacheControl(
              if (seconds == 0)
                s"no-cache; max-age=$seconds"
              else
                s"max-age=$seconds"
            )
          }
          val req = new PutObjectRequest(config.s3_bucket, upload.s3Key, new FileInputStream(uploadFile), md)
          config.s3_reduced_redundancy.filter(_ == true) foreach (_ => req setStorageClass ReducedRedundancy)
          req
        }
      ,
      redirect => {
        val req = new PutObjectRequest(config.s3_bucket, redirect.s3Key, redirect.redirectTarget)
        req.setMetadata({
          val md = new ObjectMetadata()
          md.setContentLength(0) // Otherwise the AWS SDK will log a warning
          /*
           * Instruct HTTP clients to always re-check the redirect. The 301 status code may override this, though.
           * This is for the sake of simplicity.
           */
          md.setCacheControl("max-age=0, no-cache")
          md
        })
        Try(req)
      }
    )

  def recordUploadDuration(putObjectRequest: PutObjectRequest, f: => Unit): UploadDuration = {
    val start = System.currentTimeMillis()
    f
    System.currentTimeMillis - start
  }

  def awsS3Client(config: Config) = new AmazonS3Client(awsCredentials(config))

  def resolveS3Files(nextMarker: Option[String] = None, alreadyResolved: Seq[S3File] = Nil,  attempt: Attempt = 1)
                              (implicit config: Config, s3Settings: S3Setting, ec: ExecutionContextExecutor, logger: Logger, pushOptions: PushOptions):
  Future[Either[ErrorReport, Seq[S3File]]] = Future {
    logger.debug(nextMarker.fold
      ("Querying S3 files")
      {m => s"Querying more S3 files (starting from $m)"}
    )
    val objects: ObjectListing = s3Settings.s3Client(config).listObjects({
      val req = new ListObjectsRequest()
      req.setBucketName(config.s3_bucket)
      nextMarker.foreach(req.setMarker)
      req
    })
    objects
  } flatMap { (objects: ObjectListing) =>
    val s3Files = alreadyResolved ++ (objects.getObjectSummaries.toIndexedSeq.toSeq map (S3File(_)))
    Option(objects.getNextMarker)
      .fold(Future(Right(s3Files)): Future[Either[ErrorReport, Seq[S3File]]]) // We've received all the S3 keys from the bucket
      { nextMarker => // There are more S3 keys on the bucket. Fetch them.
        resolveS3Files(Some(nextMarker), s3Files, attempt = attempt)
      }
  } recoverWith retry(attempt)(
    createFailureReport = error => ErrorReport(s"Failed to fetch an object listing (${error.getMessage})"),
    retryAction = nextAttempt => resolveS3Files(nextMarker, alreadyResolved, nextAttempt)
  )

  type S3FilesAndUpdates = (ErrorOrS3Files, UpdateFutures)
  type S3FilesAndUpdatesFuture = Future[S3FilesAndUpdates]
  type ErrorOrS3FilesAndUpdates = Future[Either[ErrorReport, S3FilesAndUpdates]]
  type UpdateFutures = Seq[Either[ErrorReport, Future[PushErrorOrSuccess]]]
  type ErrorOrS3Files = Either[ErrorReport, Seq[S3File]]

  sealed trait PushFailureReport extends ErrorReport
  sealed trait PushSuccessReport extends SuccessReport {
    def s3Key: String
  }

  case class SuccessfulRedirectDetails(uploadType: UploadType, redirectTarget: String)
  case class SuccessfulNewOrCreatedDetails(uploadType: UploadType, uploadSize: Long, uploadDuration: Option[Long])

  case class SuccessfulUpload(s3Key: S3Key,
                              details: Either[SuccessfulNewOrCreatedDetails, SuccessfulRedirectDetails],
                              putObjectRequest: PutObjectRequest)
                             (implicit pushOptions: PushOptions, logger: Logger) extends PushSuccessReport {
    def reportMessage =
      details.fold(
        newOrCreatedDetails => s"${newOrCreatedDetails.uploadType.pushAction.renderVerb} $s3Key ($reportDetails)",
        redirectDetails     => s"${redirectDetails.uploadType.pushAction.renderVerb} $s3Key to ${redirectDetails.redirectTarget}"
      )

    def reportDetails = {
      val md = putObjectRequest.getMetadata
      val detailFragments: Seq[Option[String]] =
        (
          md.getCacheControl ::
          md.getContentType ::
          md.getContentEncoding ::
          putObjectRequest.getStorageClass ::
          Nil map (Option(_)) // AWS SDK may return nulls
        ) :+ uploadSizeForHumans :+ uploadSpeedForHumans
      detailFragments.collect {
        case Some(detailFragment) => detailFragment
      }.mkString(" | ")
    }

    lazy val uploadSize = details.fold(
      newOrCreatedDetails => Some(newOrCreatedDetails.uploadSize),
      redirectDetails     => None
    )

    lazy val uploadSizeForHumans: Option[String] = uploadSize filter (_ => logger.verboseOutput) map humanReadableByteCount

    lazy val uploadSpeedForHumans: Option[String] =
      (for {
        dataSize <- uploadSize
        duration <- details.left.map(_.uploadDuration).left.toOption.flatten
      } yield {
        humanizeUploadSpeed(dataSize, duration)
      }) flatMap identity filter (_ => logger.verboseOutput)
  }
  
  object SuccessfulUpload {
    def humanizeUploadSpeed(uploadedBytes: Long, uploadDurations: UploadDuration*): Option[String] = {
      val totalDurationMillis = uploadDurations.foldLeft(0L){ (memo, duration) =>
        memo + duration
      }
      if (totalDurationMillis > 0) {
        val bytesPerMillisecond = uploadedBytes / totalDurationMillis
        val bytesPerSecond = bytesPerMillisecond * 1000 * uploadDurations.length
        Some(humanReadableByteCount(bytesPerSecond) + "/s")  
      } else {
        None
      }
    }
  }

  case class SuccessfulDelete(s3Key: String)(implicit pushOptions: PushOptions) extends PushSuccessReport {
    def reportMessage = s"${Deleted.renderVerb} $s3Key"
  }

  case class FailedUpload(s3Key: String, error: Throwable)(implicit logger: Logger) extends PushFailureReport {
    def reportMessage = errorMessage(s"Failed to upload $s3Key", error)
  }

  case class FailedDelete(s3Key: String, error: Throwable)(implicit logger: Logger) extends PushFailureReport {
    def reportMessage = errorMessage(s"Failed to delete $s3Key", error)
  }

  type S3ClientProvider = (Config) => AmazonS3

  case class S3Setting(
    s3Client: S3ClientProvider = S3.awsS3Client,
    retryTimeUnit: TimeUnit = SECONDS
  ) extends RetrySetting
}