require 'yell'

require 'traject'
require 'traject/util'
require 'traject/qualified_const_get'
require 'traject/thread_pool'

require 'json'
require 'httpclient'

require 'uri'
require 'thread'     # for Mutex/Queue
require 'concurrent' # for atomic_fixnum

# Write to Solr using the JSON interface; only works for Solr >= 3.2
#
# This should work under both MRI and JRuby, with JRuby getting much
# better performance due to the threading model.
#
# Relevant settings
#
# * solr.url (optional if solr.update_url is set) The URL to the solr core to index into
#
# * solr.update_url: The actual update url. If unset, we'll first see if
#   "#{solr.url}/update/json" exists, and if not use "#{solr.url}/update"
#
# * solr_writer.batch_size: How big a batch to send to solr. Default is 100.
#   My tests indicate that this setting doesn't change overall index speed by a ton.
#
# * solr_writer.thread_pool: How many threads to use for the writer. Default is 1.
#   Likely useful even under MRI since thread will be waiting on Solr for some time. 
#
# * solr_writer.max_skipped: How many records skipped due to errors before we 
#   bail out with a fatal error? Set to -1 for unlimited skips. Default 0, 
#   raise and abort on a single record that could not be added to Solr. 
#
# * solr_writer.commit_on_close: Set to true (or "true") if you want to commit at the
#   end of the indexing run. (Old "solrj_writer.commit_on_close" supported for backwards
#   compat only.)
#
# * solr_writer.commit_timeout: If commit_on_close, how long to wait for Solr before
#   giving up as a timeout. Default 10 minutes. Solr can be slow. 
#
# * solr_json_writer.http_client Mainly intended for testing, set your own HTTPClient
#   or mock object to be used for HTTP. 


class Traject::SolrJsonWriter
  include Traject::QualifiedConstGet

  DEFAULT_MAX_SKIPPED = 0
  DEFAULT_BATCH_SIZE  = 100

  # The passed-in settings
  attr_reader :settings, :thread_pool_size

  # A queue to hold documents before sending to solr
  attr_reader :batched_queue

  def initialize(argSettings)
    @settings = Traject::Indexer::Settings.new(argSettings)

    # Set max errors
    @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i
    if @max_skipped < 0
      @max_skipped = nil
    end

    @http_client = @settings["solr_json_writer.http_client"] || HTTPClient.new

    @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i
    @batch_size = 1 if @batch_size < 1

    # Store error count in an AtomicInteger, so multi threads can increment
    # it safely, if we're threaded.
    @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0)


    # How many threads to use for the writer?
    # if our thread pool settings are 0, it'll just create a null threadpool that
    # executes in calling context.
    @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i

    @batched_queue         = Queue.new
    @thread_pool = Traject::ThreadPool.new(@thread_pool_size)

    # old setting solrj_writer supported for backwards compat, as we make
    # this the new default writer. 
    @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true"

    # Figure out where to send updates
    @solr_update_url = self.determine_solr_update_url

    logger.info("   #{self.class.name} writing to '#{@solr_update_url}' in batches of #{@batch_size} with #{@thread_pool_size} bg threads")
  end


  # Add a single context to the queue, ready to be sent to solr
  def put(context)
    @thread_pool.raise_collected_exception!

    @batched_queue << context
    if @batched_queue.size >= @batch_size
      batch = Traject::Util.drain_queue(@batched_queue)
      @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
    end
  end

  # Send the given batch of contexts. If something goes wrong, send
  # them one at a time.
  # @param [Array<Traject::Indexer::Context>] an array of contexts
  def send_batch(batch)
    return if batch.empty?
    json_package = JSON.generate(batch.map { |c| c.output_hash })
    begin
      resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
    rescue StandardError => exception
    end

    if exception || resp.status != 200
      error_message = exception ? 
        Traject::Util.exception_to_log_message(exception) : 
        "Solr response: #{resp.status}: #{resp.body}"

      logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{error_message}"
      
      batch.each do |c|
        send_single(c)
      end
    end
  end


  # Send a single context to Solr, logging an error if need be
  # @param [Traject::Indexer::Context] c The context whose document you want to send
  def send_single(c)
    json_package = JSON.generate([c.output_hash])
    begin
      resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
      # Catch Timeouts and network errors as skipped records, but otherwise
      # allow unexpected errors to propagate up. 
    rescue HTTPClient::TimeoutError, SocketError, Errno::ECONNREFUSED => exception
    end

    if exception || resp.status != 200
      if exception
        msg = Traject::Util.exception_to_log_message(e)
      else
        msg = "Solr error response: #{resp.status}: #{resp.body}"
      end
      logger.error "Could not add record #{record_id_from_context c} at source file position #{c.position}: #{msg}"
      logger.debug(c.source_record.to_s)

      @skipped_record_incrementer.increment
      if @max_skipped and skipped_record_count > @max_skipped
        raise RuntimeError.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting")
      end

    end

  end


  # Get the logger from the settings, or default to an effectively null logger
  def logger
    settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
  end

  # Returns MARC 001, then a slash, then output_hash["id"] -- if both
  # are present. Otherwise may return just one, or even an empty string. 
  def record_id_from_context(context)
    marc_id = context.source_record && context.source_record['001'] && context.source_record['001'].value
    output_id = context.output_hash["id"]

    return [marc_id, output_id].compact.join("/")
  end


  # On close, we need to (a) raise any exceptions we might have, (b) send off
  # the last (possibly empty) batch, and (c) commit if instructed to do so
  # via the solr_writer.commit_on_close setting.
  def close
    @thread_pool.raise_collected_exception!

    # Finish off whatever's left. Do it in the thread pool for
    # consistency, and to ensure expected order of operations, so
    # it goes to the end of the queue behind any other work.
    batch = Traject::Util.drain_queue(@batched_queue)
    if batch.length > 0
      @thread_pool.maybe_in_thread_pool { send_batch(batch) }
    end

    # Wait for shutdown, and time it.
    logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
    elapsed = @thread_pool.shutdown_and_wait
    if elapsed > 60
      logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
    end
    logger.debug "#{self.class.name}: Thread pool shutdown complete"
    logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0

    # check again now that we've waited, there could still be some
    # that didn't show up before.
    @thread_pool.raise_collected_exception!

    # Commit if we're supposed to
    if @commit_on_close
      commit
    end
  end


  # Send a commit
  def commit
    logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..."

    original_timeout = @http_client.receive_timeout

    @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i

    resp = @http_client.get(@solr_update_url, {"commit" => 'true'})
    unless resp.status == 200
      raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}")
    end

    @http_client.receive_timeout = original_timeout
  end


  # Return count of encountered skipped records. Most accurate to call
  # it after #close, in which case it should include full count, even
  # under async thread_pool.
  def skipped_record_count
    @skipped_record_incrementer.value
  end


  # Relatively complex logic to determine if we have a valid URL and what it is
  def determine_solr_update_url
    if settings['solr.update_url']
      check_solr_update_url(settings['solr.update_url'])
    else
      derive_solr_update_url_from_solr_url(settings['solr.url'])
    end
  end


  # If we've got a solr.update_url, make sure it's ok
  def check_solr_update_url(url)
    unless url =~ /^#{URI::regexp}$/
      raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`")
    end
    url
  end

  def derive_solr_update_url_from_solr_url(url)
    # Nil? Then we bail
    if url.nil?
      raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one")
    end

    # Not a URL? Bail
    unless url =~ /^#{URI::regexp}$/
      raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`")
    end

    # First, try the /update/json handler
    candidate = [url.chomp('/'), 'update', 'json'].join('/')
    resp      = @http_client.get(candidate)
    if resp.status == 404
      candidate = [url.chomp('/'), 'update'].join('/')
    end
    candidate
  end


end