# Author: Rodrigo De Castro <rdc@google.com>
# Date: 2013-09-20
#
# Copyright 2013 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "logstash/outputs/base"
require "logstash/namespace"

# Summary: plugin to upload log events to Google BigQuery (BQ), rolling
# files based on the date pattern provided as a configuration setting. Events
# are written to files locally and, once file is closed, this plugin uploads
# it to the configured BigQuery dataset.
#
# VERY IMPORTANT:
# 1 - To make good use of BigQuery, your log events should be parsed and
# structured. Consider using grok to parse your events into fields that can
# be uploaded to BQ.
# 2 - You must configure your plugin so it gets events with the same structure,
# so the BigQuery schema suits them. In case you want to upload log events
# with different structures, you can utilize multiple configuration blocks,
# separating different log events with Logstash conditionals. More details on
# Logstash conditionals can be found here:
# http://logstash.net/docs/1.2.1/configuration#conditionals
#
# For more info on Google BigQuery, please go to:
# https://developers.google.com/bigquery/
#
# In order to use this plugin, a Google service account must be used. For
# more information, please refer to:
# https://developers.google.com/storage/docs/authentication#service_accounts
#
# Recommendations:

# a - Experiment with the settings depending on how much log data you generate,
# your needs to see "fresh" data, and how much data you could lose in the event
# of crash. For instance, if you want to see recent data in BQ quickly, you
# could configure the plugin to upload data every minute or so (provided you
# have enough log events to justify that). Note also, that if uploads are too
# frequent, there is no guarantee that they will be imported in the same order,
# so later data may be available before earlier data.

# b - BigQuery charges for storage and for queries, depending on how much data
# it reads to perform a query. These are other aspects to consider when
# considering the date pattern which will be used to create new tables and also
# how to compose the queries when using BQ. For more info on BigQuery Pricing,
# please access:
# https://developers.google.com/bigquery/pricing
#
# USAGE:
# This is an example of logstash config:
#
# output {
#    google_bigquery {
#      project_id => "folkloric-guru-278"                        (required)
#      dataset => "logs"                                         (required)
#      csv_schema => "path:STRING,status:INTEGER,score:FLOAT"    (required)
#      key_path => "/path/to/privatekey.p12"                     (required)
#      key_password => "notasecret"                              (optional)
#      service_account => "1234@developer.gserviceaccount.com"   (required)
#      temp_directory => "/tmp/logstash-bq"                      (optional)
#      temp_file_prefix => "logstash_bq"                         (optional)
#      date_pattern => "%Y-%m-%dT%H:00"                          (optional)
#      flush_interval_secs => 2                                  (optional)
#      uploader_interval_secs => 60                              (optional)
#      deleter_interval_secs => 60                               (optional)
#    }
# }
#
# Improvements TODO list:
# - Refactor common code between Google BQ and GCS plugins.
# - Turn Google API code into a Plugin Mixin (like AwsConfig).
# - There's no recover method, so if logstash/plugin crashes, files may not
# be uploaded to BQ.
class LogStash::Outputs::GoogleBigQuery < LogStash::Outputs::Base
  config_name "google_bigquery"
  milestone 1

  # Google Cloud Project ID (number, not Project Name!).
  config :project_id, :validate => :string, :required => true

  # BigQuery dataset to which these events will be added to.
  config :dataset, :validate => :string, :required => true

  # BigQuery table ID prefix to be used when creating new tables for log data.
  # Table name will be <table_prefix>_<date>
  config :table_prefix, :validate => :string, :default => "logstash"

  # Schema for log data. It must follow this format:
  # <field1-name>:<field1-type>,<field2-name>:<field2-type>,...
  # Example: path:STRING,status:INTEGER,score:FLOAT
  config :csv_schema, :validate => :string, :required => true

  # Path to private key file for Google Service Account.
  config :key_path, :validate => :string, :required => true

  # Private key password for service account private key.
  config :key_password, :validate => :string, :default => "notasecret"

  # Service account to access Google APIs.
  config :service_account, :validate => :string, :required => true

  # Directory where temporary files are stored.
  # Defaults to /tmp/logstash-bq-<random-suffix>
  config :temp_directory, :validate => :string, :default => ""

  # Temporary local file prefix. Log file will follow the format:
  # <prefix>_hostname_date.part?.log
  config :temp_file_prefix, :validate => :string, :default => "logstash_bq"

  # Time pattern for BigQuery table, defaults to hourly tables.
  # Must Time.strftime patterns: www.ruby-doc.org/core-2.0/Time.html#method-i-strftime
  config :date_pattern, :validate => :string, :default => "%Y-%m-%dT%H:00"

  # Flush interval in seconds for flushing writes to log files. 0 will flush
  # on every message.
  config :flush_interval_secs, :validate => :number, :default => 2

  # Uploader interval when uploading new files to BigQuery. Adjust time based
  # on your time pattern (for example, for hourly files, this interval can be
  # around one hour).
  config :uploader_interval_secs, :validate => :number, :default => 60

  # Deleter interval when checking if upload jobs are done for file deletion.
  # This only affects how long files are on the hard disk after the job is done.
  config :deleter_interval_secs, :validate => :number, :default => 60

  public
  def register
    require 'csv'
    require "fileutils"
    require "thread"

    @logger.debug("BQ: register plugin")

    @fields = Array.new

    CSV.parse(@csv_schema.gsub('\"', '""')).flatten.each do |field|
      temp = field.strip.split(":")

      # Check that the field in the schema follows the format (<name>:<value>)
      if temp.length != 2
        raise "BigQuery schema must follow the format <field-name>:<field-value>"
      end

      @fields << { "name" => temp[0], "type" => temp[1] }
    end

    # Check that we have at least one field in the schema
    if @fields.length == 0
      raise "BigQuery schema must contain at least one field"
    end

    @json_schema = { "fields" => @fields }

    @upload_queue = Queue.new
    @delete_queue = Queue.new
    @last_flush_cycle = Time.now
    initialize_temp_directory()
    initialize_current_log()
    initialize_google_client()
    initialize_uploader()
    initialize_deleter()
  end

  # Method called for each log event. It writes the event to the current output
  # file, flushing depending on flush interval configuration.
  public
  def receive(event)
    return unless output?(event)

    @logger.debug("BQ: receive method called", :event => event)

    # Message must be written as json
    message = event.to_json
    # Remove "@" from property names
    message = message.gsub(/\"@(\w+)\"/, '"\1"')

    new_base_path = get_base_path()

    # Time to roll file based on the date pattern? Or are we due to upload it to BQ?
    if (@current_base_path != new_base_path || Time.now - @last_file_time >= @uploader_interval_secs)
      @logger.debug("BQ: log file will be closed and uploaded",
                    :filename => File.basename(@temp_file.to_path),
                    :size => @temp_file.size.to_s,
                    :uploader_interval_secs => @uploader_interval_secs.to_s)
      # Close alone does not guarantee that data is physically written to disk,
      # so flushing it before.
      @temp_file.fsync()
      @temp_file.close()
      initialize_next_log()
    end

    @temp_file.write(message)
    @temp_file.write("\n")

    sync_log_file()

    @logger.debug("BQ: event appended to log file",
                  :filename => File.basename(@temp_file.to_path))
  end

  public
  def teardown
    @logger.debug("BQ: teardown method called")

    @temp_file.flush()
    @temp_file.close()
  end

  private
  ##
  # Flushes temporary log file every flush_interval_secs seconds or so.
  # This is triggered by events, but if there are no events there's no point
  # flushing files anyway.
  #
  # Inspired by lib/logstash/outputs/file.rb (flush(fd), flush_pending_files)
  def sync_log_file
    if flush_interval_secs <= 0
      @temp_file.fsync
      return
    end

    return unless Time.now - @last_flush_cycle >= flush_interval_secs
    @temp_file.fsync
    @logger.debug("BQ: flushing file",
                  :path => @temp_file.to_path,
                  :fd => @temp_file)
    @last_flush_cycle = Time.now
  end

  ##
  # Creates temporary directory, if it does not exist.
  #
  # A random suffix is appended to the temporary directory
  def initialize_temp_directory
    if @temp_directory.empty?
      require "stud/temporary"
      @temp_directory = Stud::Temporary.directory("logstash-bq")
      @logger.info("BQ: temporary directory generated",
                   :directory => @temp_directory)
    end

    if !(File.directory? @temp_directory)
      @logger.debug("BQ: directory doesn't exist. Creating it.",
                    :directory => @temp_directory)
      FileUtils.mkdir_p(@temp_directory)
    end
  end

  ##
  # Starts thread to delete uploaded log files once their jobs are done.
  #
  # Deleter is done in a separate thread, not holding the receive method above.
  def initialize_deleter
    @uploader = Thread.new do
      @logger.debug("BQ: starting deleter")
      while true
        delete_item = @delete_queue.pop
        job_id = delete_item["job_id"]
        filename = delete_item["filename"]
        job_status = get_job_status(job_id)
        case job_status["state"]
        when "DONE"
          if job_status.has_key?("errorResult")
            @logger.error("BQ: job failed, please enable debug and check full "\
                          "response (probably the issue is an incompatible "\
                          "schema). NOT deleting local file.",
                          :job_id => job_id,
                          :filename => filename,
                          :job_status => job_status)
          else
            @logger.debug("BQ: job is done, deleting local temporary file ",
                          :job_id => job_id,
                          :filename => filename,
                          :job_status => job_status)
            File.delete(filename)
          end
        when "PENDING", "RUNNING"
          @logger.debug("BQ: job is not done, NOT deleting local file yet.",
                        :job_id => job_id,
                        :filename => filename,
                        :job_status => job_status)
          @delete_queue << delete_item
        else
          @logger.error("BQ: unknown job status, please enable debug and "\
                        "check full response (probably the issue is an "\
                        "incompatible schema). NOT deleting local file yet.",
                        :job_id => job_id,
                        :filename => filename,
                        :job_status => job_status)
        end

        sleep @deleter_interval_secs
      end
    end
  end

  ##
  # Starts thread to upload log files.
  #
  # Uploader is done in a separate thread, not holding the receive method above.
  def initialize_uploader
    @uploader = Thread.new do
      @logger.debug("BQ: starting uploader")
      while true
        filename = @upload_queue.pop

        # Reenqueue if it is still the current file.
        if filename == @temp_file.to_path
          if @current_base_path == get_base_path()
            if Time.now - @last_file_time < @uploader_interval_secs
              @logger.debug("BQ: reenqueue as log file is being currently appended to.",
                            :filename => filename)
              @upload_queue << filename
              # If we got here, it means that older files were uploaded, so let's
              # wait another minute before checking on this file again.
              sleep @uploader_interval_secs
              next
            else
              @logger.debug("BQ: flush and close file to be uploaded.",
                            :filename => filename)
              @temp_file.flush()
              @temp_file.close()
              initialize_next_log()
            end
          end
        end

        if File.size(filename) > 0
          job_id = upload_object(filename)
          @delete_queue << { "filename" => filename, "job_id" => job_id }
        else
          @logger.debug("BQ: skipping empty file.")
          @logger.debug("BQ: delete local temporary file ",
                        :filename => filename)
          File.delete(filename)
        end

        sleep @uploader_interval_secs
      end
    end
  end

  ##
  # Returns undated path used to construct base path and final full path.
  # This path only includes directory, prefix, and hostname info.
  def get_undated_path
    return @temp_directory + File::SEPARATOR + @temp_file_prefix + "_" +
      Socket.gethostname()
  end

  ##
  # Returns base path to log file that is invariant regardless of any
  # user options.
  def get_base_path
    return get_undated_path() + "_" + Time.now.strftime(@date_pattern)
  end

  ##
  # Returns full path to the log file based on global variables (like
  # current_base_path) and configuration options (max file size).
  def get_full_path
    return @current_base_path + ".part" + ("%03d" % @size_counter) + ".log"
  end

  ##
  # Returns date from a temporary log file name.
  def get_date_pattern(filename)
    match = /^#{get_undated_path()}_(?<date>.*)\.part(\d+)\.log$/.match(filename)
    return match[:date]
  end

  ##
  # Returns latest part number for a base path. This method checks all existing
  # log files in order to find the highest part number, so this file can be used
  # for appending log events.
  #
  # Only applicable if max file size is enabled.
  def get_latest_part_number(base_path)
    part_numbers = Dir.glob(base_path + ".part*.log").map do |item|
      match = /^.*\.part(?<part_num>\d+).log$/.match(item)
      next if match.nil?
      match[:part_num].to_i
    end

    return part_numbers.max if part_numbers.any?
    0
  end

  ##
  # Opens current log file and updates @temp_file with an instance of IOWriter.
  # This method also adds file to the upload queue.
  def open_current_file()
    path = get_full_path()
    stat = File.stat(path) rescue nil
    if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java"
      fd = java.io.FileWriter.new(java.io.File.new(path))
    else
      fd = File.new(path, "a")
    end
    @temp_file = IOWriter.new(fd)
    @upload_queue << @temp_file.to_path
  end

  ##
  # Opens log file on plugin initialization, trying to resume from an existing
  # file. If max file size is enabled, find the highest part number and resume
  # from it.
  def initialize_current_log
    @current_base_path = get_base_path
    @last_file_time = Time.now
    @size_counter = get_latest_part_number(@current_base_path)
    @logger.debug("BQ: resuming from latest part.",
                  :part => @size_counter)
    open_current_file()
  end

  ##
  # Generates new log file name based on configuration options and opens log
  # file. If max file size is enabled, part number if incremented in case the
  # the base log file name is the same (e.g. log file was not rolled given the
  # date pattern).
  def initialize_next_log
    new_base_path = get_base_path
    @size_counter = @current_base_path == new_base_path ? @size_counter + 1 : 0
    @logger.debug("BQ: opening next log file.",
                  :filename => @current_base_path,
                  :part => @size_counter)
    @current_base_path = new_base_path
    @last_file_time = Time.now
    open_current_file()
  end

  ##
  # Initializes Google Client instantiating client and authorizing access.
  def initialize_google_client
    require "google/api_client"
    require "openssl"

    @client = Google::APIClient.new(:application_name =>
                                    'Logstash Google BigQuery output plugin',
                                    :application_version => '0.1')
    @bq = @client.discovered_api('bigquery', 'v2')


    key = Google::APIClient::PKCS12.load_key(@key_path, @key_password)
    # Authorization scope reference:
    # https://developers.google.com/bigquery/docs/authorization
    service_account = Google::APIClient::JWTAsserter.new(@service_account,
                                                         'https://www.googleapis.com/auth/bigquery',
                                                         key)
    @client.authorization = service_account.authorize
  end

  ##
  # Uploads a local file to the configured bucket.
  def get_job_status(job_id)
    begin
      require 'json'
      @logger.debug("BQ: check job status.",
                    :job_id => job_id)
      get_result = @client.execute(:api_method => @bq.jobs.get,
                                   :parameters => {
                                     'jobId' => job_id,
                                     'projectId' => @project_id
                                   })
      response = JSON.parse(get_result.response.body)
      @logger.debug("BQ: successfully invoked API.",
                    :response => response)

      if response.has_key?("error")
        raise response["error"]
      end

      # Successful invocation
      contents = response["status"]
      return contents
    rescue => e
      @logger.error("BQ: failed to check status", :exception => e)
      # TODO(rdc): limit retries?
      sleep 1
      retry
    end
  end

  ##
  # Uploads a local file to the configured bucket.
  def upload_object(filename)
    begin
      require 'json'
      table_id = @table_prefix + "_" + get_date_pattern(filename)
      # BQ does not accept anything other than alphanumeric and _
      # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en
      table_id = table_id.gsub!(':','_').gsub!('-', '_')

      @logger.debug("BQ: upload object.",
                    :filename => filename,
                    :table_id => table_id)
      media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")
      body = {
        "configuration" => {
          "load" => {
            "sourceFormat" => "NEWLINE_DELIMITED_JSON",
            "schema" => @json_schema,
            "destinationTable"  =>  {
              "projectId" => @project_id,
              "datasetId" => @dataset,
              "tableId" => table_id
            },
            'createDisposition' => 'CREATE_IF_NEEDED',
            'writeDisposition' => 'WRITE_APPEND'
          }
        }
      }
      insert_result = @client.execute(:api_method => @bq.jobs.insert,
                                      :body_object => body,
                                      :parameters => {
                                        'uploadType' => 'multipart',
                                        'projectId' => @project_id
                                      },
                                      :media => media)

      job_id = JSON.parse(insert_result.response.body)["jobReference"]["jobId"]
      @logger.debug("BQ: multipart insert",
                    :job_id => job_id)
      return job_id
    rescue => e
      @logger.error("BQ: failed to upload file", :exception => e)
      # TODO(rdc): limit retries?
      sleep 1
      retry
    end
  end
end

##
# Wrapper class that abstracts which IO being used (for instance, regular
# files or GzipWriter.
#
# Inspired by lib/logstash/outputs/file.rb.
class IOWriter
  def initialize(io)
    @io = io
  end
  def write(*args)
    @io.write(*args)
  end
  def flush
    @io.flush
  end
  def method_missing(method_name, *args, &block)
    if @io.respond_to?(method_name)
      @io.send(method_name, *args, &block)
    else
      super
    end
  end
  attr_accessor :active
end