require 'zlib'
require 'thread'
require 'pathname'

require 'aws-sdk'
require 'mime-types'

# You can either use the block syntax, or:
# * instantiate a class
# * queue data to be published with push
# * call run to actually upload the data to S3
class S3Publisher
  
  attr_reader :bucket_name, :base_path, :logger, :workers_to_use

  # Block style.  run is called for you on block close.
  #  S3Publisher.publish('my-bucket') do |p|
  #    p.push('test.txt', '123abc')
  #  end
  def self.publish bucket_name, opts={}, &block
    p = self.new(bucket_name, opts)
    yield(p)
    p.run
  end

  # @param [String] bucket_name
  # @option opts [String] :base_path Path prepended to supplied file_name on upload
  # @option opts [Integer] :workers Number of threads to use when pushing to S3. Defaults to 3.
  # @option opts [Object] :logger A logger object to recieve 'uploaded' messages.  Defaults to STDOUT.
  #
  # Additional keys will be passed through to the Aws::S3::Client init, including:
  # @option opts [String] :region AWS region to use, if different than global Aws config
  # @option opts [Object] :credentials - :access_key_id, :secret_access_key, and :session_token options, if different than global Aws config
  # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html#initialize-instance_method for full details.

  def initialize bucket_name, opts={}
    @publish_queue = Queue.new
    @workers_to_use = opts.delete(:workers) || 3
    @logger         = opts.delete(:logger)  || $stdout
    @bucket_name, @base_path = bucket_name, opts.delete(:base_path)

    @s3 = Aws::S3::Client.new(opts)
  end

  # Queues a file to be published.
  # You can provide :data as a string, or a path to a file with :file.
  # :file references won't be evaluated until publish-time, reducing memory overhead.
  #
  # @param [String] key_name  The name of the file on S3.  base_path will be prepended if supplied.
  # @option opts [String] :data  a string to be published
  # @option opts [String] :file  path to a file to publish
  # @option opts [Boolean] :gzip  gzip file contents?  defaults to true.
  # @option opts [Integer] :ttl  TTL in seconds for cache-control header. defaults to 5.
  # @option opts [String] :cache_control  specify Cache-Control header directly if you don't like the default
  # @option opts [String] :content_type  no need to specify if default based on extension is okay.  But if you need to force,
  #                                      you can provide :xml, :html, :text, or your own custom string.
  def push key_name, opts={}
    write_opts = { acl: 'public-read' }
    
    key_name = "#{base_path}/#{key_name}" unless base_path.nil?
    
    # Setup data.
    if opts[:data]
      contents = opts[:data]
    elsif opts[:file]
      contents = Pathname.new(opts[:file])
      raise ArgumentError, "'#{opts[:file]}' does not exist!" if !contents.exist?
    else
      raise ArgumentError, "A :file or :data attr must be provided to publish to S3!"
    end

    # Then Content-Type
    if opts[:content_type]
      write_opts[:content_type] = opts[:content_type]
    else
      matching_mimes = MIME::Types.type_for(key_name)
      raise  ArgumentError, "Can't infer the content-type for '#{key_name}'! Please specify with the :content_type opt." if matching_mimes.empty?
      write_opts[:content_type] = matching_mimes.first.to_s
    end

    # And Cache-Control
    if opts.has_key?(:cache_control)
      write_opts[:cache_control] = opts[:cache_control]
    else
      write_opts[:cache_control] = "max-age=#{opts[:ttl] || 5}"
    end

    # And ACL
    if opts[:acl]
      write_opts[:acl] = opts[:acl]
    end

    opts[:gzip] = true unless opts.has_key?(:gzip)

    @publish_queue.push({ key_name: key_name, contents: contents, write_opts: write_opts, gzip: opts[:gzip] })
  end  
    
  # Process queued uploads and push to S3
  def run
    threads = []
    workers_to_use.times { threads << Thread.new { publish_from_queue } }
    threads.each { |t| t.join }
    true
  end
  
  def inspect
    "#<S3Publisher:#{bucket_name}>"
  end
  
  private
  def gzip data
    gzipped_data = StringIO.open('', 'w+')
    
    gzip_writer = Zlib::GzipWriter.new(gzipped_data)
    gzip_writer.write(data)
    gzip_writer.close
    
    return gzipped_data.string
  end
  
  def publish_from_queue
    loop do
      item = @publish_queue.pop(true)
    
      try_count = 0
      begin
        gzip = item[:gzip] != false && !item[:write_opts][:content_type].match('image/')

        if item[:contents].is_a?(Pathname)
          item[:contents] = item[:contents].read
        end

        if gzip
          item[:write_opts][:content_encoding] = 'gzip'
          item[:contents] = gzip(item[:contents])
        end

        write_opts = {
          bucket: bucket_name,
          key: item[:key_name],
          body: item[:contents]
        }
        write_opts.merge!(item[:write_opts])

        @s3.put_object(write_opts)

      rescue Exception => e # backstop against transient S3 errors
        raise e if try_count >= 1
        try_count += 1
        retry
      end
    
      logger << "Wrote http://#{bucket_name}.s3.amazonaws.com/#{item[:key_name]} with #{item[:write_opts].inspect}\n"
    end
  rescue ThreadError  # ThreadError hit when queue is empty.  Simply jump out of loop and return to join().
  end
  
end