# Author: Rodrigo De Castro # Date: 2013-09-20 # # Copyright 2013 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "logstash/outputs/base" require "logstash/namespace" require "logstash/json" # Summary: plugin to upload log events to Google BigQuery (BQ), rolling # files based on the date pattern provided as a configuration setting. Events # are written to files locally and, once file is closed, this plugin uploads # it to the configured BigQuery dataset. # # VERY IMPORTANT: # 1 - To make good use of BigQuery, your log events should be parsed and # structured. Consider using grok to parse your events into fields that can # be uploaded to BQ. # 2 - You must configure your plugin so it gets events with the same structure, # so the BigQuery schema suits them. In case you want to upload log events # with different structures, you can utilize multiple configuration blocks, # separating different log events with Logstash conditionals. More details on # Logstash conditionals can be found here: # http://logstash.net/docs/1.2.1/configuration#conditionals # # For more info on Google BigQuery, please go to: # https://developers.google.com/bigquery/ # # In order to use this plugin, a Google service account must be used. For # more information, please refer to: # https://developers.google.com/storage/docs/authentication#service_accounts # # Recommendations: # a - Experiment with the settings depending on how much log data you generate, # your needs to see "fresh" data, and how much data you could lose in the event # of crash. For instance, if you want to see recent data in BQ quickly, you # could configure the plugin to upload data every minute or so (provided you # have enough log events to justify that). Note also, that if uploads are too # frequent, there is no guarantee that they will be imported in the same order, # so later data may be available before earlier data. # b - BigQuery charges for storage and for queries, depending on how much data # it reads to perform a query. These are other aspects to consider when # considering the date pattern which will be used to create new tables and also # how to compose the queries when using BQ. For more info on BigQuery Pricing, # please access: # https://developers.google.com/bigquery/pricing # # USAGE: # This is an example of logstash config: # # output { # google_bigquery { # project_id => "folkloric-guru-278" (required) # dataset => "logs" (required) # csv_schema => "path:STRING,status:INTEGER,score:FLOAT" (required) # key_path => "/path/to/privatekey.p12" (required) # key_password => "notasecret" (optional) # service_account => "1234@developer.gserviceaccount.com" (required) # temp_directory => "/tmp/logstash-bq" (optional) # temp_file_prefix => "logstash_bq" (optional) # date_pattern => "%Y-%m-%dT%H:00" (optional) # flush_interval_secs => 2 (optional) # uploader_interval_secs => 60 (optional) # deleter_interval_secs => 60 (optional) # } # } # # Improvements TODO list: # - Refactor common code between Google BQ and GCS plugins. # - Turn Google API code into a Plugin Mixin (like AwsConfig). # - There's no recover method, so if logstash/plugin crashes, files may not # be uploaded to BQ. class LogStash::Outputs::GoogleBigQuery < LogStash::Outputs::Base config_name "google_bigquery" # Google Cloud Project ID (number, not Project Name!). config :project_id, :validate => :string, :required => true # BigQuery dataset to which these events will be added to. config :dataset, :validate => :string, :required => true # BigQuery table ID prefix to be used when creating new tables for log data. # Table name will be _ config :table_prefix, :validate => :string, :default => "logstash" # Schema for log data. It must follow this format: # :,:,... # Example: path:STRING,status:INTEGER,score:FLOAT config :csv_schema, :validate => :string, :required => true # Indicates if BigQuery should allow extra values that are not represented in the table schema. # If true, the extra values are ignored. If false, records with extra columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. The default value is false. config :ignore_unknown_values, :validate => :boolean, :default => false # Path to private key file for Google Service Account. config :key_path, :validate => :string, :required => true # Private key password for service account private key. config :key_password, :validate => :string, :default => "notasecret" # Service account to access Google APIs. config :service_account, :validate => :string, :required => true # Directory where temporary files are stored. # Defaults to /tmp/logstash-bq- config :temp_directory, :validate => :string, :default => "" # Temporary local file prefix. Log file will follow the format: # _hostname_date.part?.log config :temp_file_prefix, :validate => :string, :default => "logstash_bq" # Time pattern for BigQuery table, defaults to hourly tables. # Must Time.strftime patterns: www.ruby-doc.org/core-2.0/Time.html#method-i-strftime config :date_pattern, :validate => :string, :default => "%Y-%m-%dT%H:00" # Flush interval in seconds for flushing writes to log files. 0 will flush # on every message. config :flush_interval_secs, :validate => :number, :default => 2 # Uploader interval when uploading new files to BigQuery. Adjust time based # on your time pattern (for example, for hourly files, this interval can be # around one hour). config :uploader_interval_secs, :validate => :number, :default => 60 # Deleter interval when checking if upload jobs are done for file deletion. # This only affects how long files are on the hard disk after the job is done. config :deleter_interval_secs, :validate => :number, :default => 60 public def register require 'csv' require "fileutils" require "thread" @logger.debug("BQ: register plugin") @fields = Array.new CSV.parse(@csv_schema.gsub('\"', '""')).flatten.each do |field| temp = field.strip.split(":") # Check that the field in the schema follows the format (:) if temp.length != 2 raise "BigQuery schema must follow the format :" end @fields << { "name" => temp[0], "type" => temp[1] } end # Check that we have at least one field in the schema if @fields.length == 0 raise "BigQuery schema must contain at least one field" end @json_schema = { "fields" => @fields } @upload_queue = Queue.new @delete_queue = Queue.new @last_flush_cycle = Time.now initialize_temp_directory() initialize_current_log() initialize_google_client() initialize_uploader() initialize_deleter() end # Method called for each log event. It writes the event to the current output # file, flushing depending on flush interval configuration. public def receive(event) return unless output?(event) @logger.debug("BQ: receive method called", :event => event) # Message must be written as json message = event.to_json # Remove "@" from property names message = message.gsub(/\"@(\w+)\"/, '"\1"') new_base_path = get_base_path() # Time to roll file based on the date pattern? Or are we due to upload it to BQ? if (@current_base_path != new_base_path || Time.now - @last_file_time >= @uploader_interval_secs) @logger.debug("BQ: log file will be closed and uploaded", :filename => File.basename(@temp_file.to_path), :size => @temp_file.size.to_s, :uploader_interval_secs => @uploader_interval_secs.to_s) # Close alone does not guarantee that data is physically written to disk, # so flushing it before. @temp_file.fsync() @temp_file.close() initialize_next_log() end @temp_file.write(message) @temp_file.write("\n") sync_log_file() @logger.debug("BQ: event appended to log file", :filename => File.basename(@temp_file.to_path)) end public def teardown @logger.debug("BQ: teardown method called") @temp_file.flush() @temp_file.close() end private ## # Flushes temporary log file every flush_interval_secs seconds or so. # This is triggered by events, but if there are no events there's no point # flushing files anyway. # # Inspired by lib/logstash/outputs/file.rb (flush(fd), flush_pending_files) def sync_log_file if flush_interval_secs <= 0 @temp_file.fsync return end return unless Time.now - @last_flush_cycle >= flush_interval_secs @temp_file.fsync @logger.debug("BQ: flushing file", :path => @temp_file.to_path, :fd => @temp_file) @last_flush_cycle = Time.now end ## # Creates temporary directory, if it does not exist. # # A random suffix is appended to the temporary directory def initialize_temp_directory if @temp_directory.empty? require "stud/temporary" @temp_directory = Stud::Temporary.directory("logstash-bq") @logger.info("BQ: temporary directory generated", :directory => @temp_directory) end if !(File.directory? @temp_directory) @logger.debug("BQ: directory doesn't exist. Creating it.", :directory => @temp_directory) FileUtils.mkdir_p(@temp_directory) end end ## # Starts thread to delete uploaded log files once their jobs are done. # # Deleter is done in a separate thread, not holding the receive method above. def initialize_deleter @uploader = Thread.new do @logger.debug("BQ: starting deleter") while true delete_item = @delete_queue.pop job_id = delete_item["job_id"] filename = delete_item["filename"] job_status = get_job_status(job_id) case job_status["state"] when "DONE" if job_status.has_key?("errorResult") @logger.error("BQ: job failed, please enable debug and check full "\ "response (probably the issue is an incompatible "\ "schema). NOT deleting local file.", :job_id => job_id, :filename => filename, :job_status => job_status) else @logger.debug("BQ: job is done, deleting local temporary file ", :job_id => job_id, :filename => filename, :job_status => job_status) File.delete(filename) end when "PENDING", "RUNNING" @logger.debug("BQ: job is not done, NOT deleting local file yet.", :job_id => job_id, :filename => filename, :job_status => job_status) @delete_queue << delete_item else @logger.error("BQ: unknown job status, please enable debug and "\ "check full response (probably the issue is an "\ "incompatible schema). NOT deleting local file yet.", :job_id => job_id, :filename => filename, :job_status => job_status) end sleep @deleter_interval_secs end end end ## # Starts thread to upload log files. # # Uploader is done in a separate thread, not holding the receive method above. def initialize_uploader @uploader = Thread.new do @logger.debug("BQ: starting uploader") while true filename = @upload_queue.pop # Reenqueue if it is still the current file. if filename == @temp_file.to_path if @current_base_path == get_base_path() if Time.now - @last_file_time < @uploader_interval_secs @logger.debug("BQ: reenqueue as log file is being currently appended to.", :filename => filename) @upload_queue << filename # If we got here, it means that older files were uploaded, so let's # wait another minute before checking on this file again. sleep @uploader_interval_secs next else @logger.debug("BQ: flush and close file to be uploaded.", :filename => filename) @temp_file.flush() @temp_file.close() initialize_next_log() end end end if File.size(filename) > 0 job_id = upload_object(filename) @delete_queue << { "filename" => filename, "job_id" => job_id } else @logger.debug("BQ: skipping empty file.") @logger.debug("BQ: delete local temporary file ", :filename => filename) File.delete(filename) end sleep @uploader_interval_secs end end end ## # Returns undated path used to construct base path and final full path. # This path only includes directory, prefix, and hostname info. def get_undated_path return @temp_directory + File::SEPARATOR + @temp_file_prefix + "_" + Socket.gethostname() end ## # Returns base path to log file that is invariant regardless of any # user options. def get_base_path return get_undated_path() + "_" + Time.now.strftime(@date_pattern) end ## # Returns full path to the log file based on global variables (like # current_base_path) and configuration options (max file size). def get_full_path return @current_base_path + ".part" + ("%03d" % @size_counter) + ".log" end ## # Returns date from a temporary log file name. def get_date_pattern(filename) match = /^#{get_undated_path()}_(?.*)\.part(\d+)\.log$/.match(filename) return match[:date] end ## # Returns latest part number for a base path. This method checks all existing # log files in order to find the highest part number, so this file can be used # for appending log events. # # Only applicable if max file size is enabled. def get_latest_part_number(base_path) part_numbers = Dir.glob(base_path + ".part*.log").map do |item| match = /^.*\.part(?\d+).log$/.match(item) next if match.nil? match[:part_num].to_i end return part_numbers.max if part_numbers.any? 0 end ## # Opens current log file and updates @temp_file with an instance of IOWriter. # This method also adds file to the upload queue. def open_current_file() path = get_full_path() stat = File.stat(path) rescue nil if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java" fd = java.io.FileWriter.new(java.io.File.new(path)) else fd = File.new(path, "a") end @temp_file = IOWriter.new(fd) @upload_queue << @temp_file.to_path end ## # Opens log file on plugin initialization, trying to resume from an existing # file. If max file size is enabled, find the highest part number and resume # from it. def initialize_current_log @current_base_path = get_base_path @last_file_time = Time.now @size_counter = get_latest_part_number(@current_base_path) @logger.debug("BQ: resuming from latest part.", :part => @size_counter) open_current_file() end ## # Generates new log file name based on configuration options and opens log # file. If max file size is enabled, part number if incremented in case the # the base log file name is the same (e.g. log file was not rolled given the # date pattern). def initialize_next_log new_base_path = get_base_path @size_counter = @current_base_path == new_base_path ? @size_counter + 1 : 0 @logger.debug("BQ: opening next log file.", :filename => @current_base_path, :part => @size_counter) @current_base_path = new_base_path @last_file_time = Time.now open_current_file() end ## # Initializes Google Client instantiating client and authorizing access. def initialize_google_client require "google/api_client" require "openssl" @client = Google::APIClient.new(:application_name => 'Logstash Google BigQuery output plugin', :application_version => '0.1') @bq = @client.discovered_api('bigquery', 'v2') key = Google::APIClient::PKCS12.load_key(@key_path, @key_password) # Authorization scope reference: # https://developers.google.com/bigquery/docs/authorization service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/bigquery', key) @client.authorization = service_account.authorize end ## # Uploads a local file to the configured bucket. def get_job_status(job_id) begin @logger.debug("BQ: check job status.", :job_id => job_id) get_result = @client.execute(:api_method => @bq.jobs.get, :parameters => { 'jobId' => job_id, 'projectId' => @project_id }) response = LogStash::Json.load(get_result.response.body) @logger.debug("BQ: successfully invoked API.", :response => response) if response.has_key?("error") raise response["error"] end # Successful invocation contents = response["status"] return contents rescue => e @logger.error("BQ: failed to check status", :exception => e) # TODO(rdc): limit retries? sleep 1 retry end end ## # Uploads a local file to the configured bucket. def upload_object(filename) begin table_id = @table_prefix + "_" + get_date_pattern(filename) # BQ does not accept anything other than alphanumeric and _ # Ref: https://developers.google.com/bigquery/browser-tool-quickstart?hl=en table_id.tr!(':-','_') @logger.debug("BQ: upload object.", :filename => filename, :table_id => table_id) media = Google::APIClient::UploadIO.new(filename, "application/octet-stream") body = { "configuration" => { "load" => { "sourceFormat" => "NEWLINE_DELIMITED_JSON", "schema" => @json_schema, "destinationTable" => { "projectId" => @project_id, "datasetId" => @dataset, "tableId" => table_id }, 'createDisposition' => 'CREATE_IF_NEEDED', 'writeDisposition' => 'WRITE_APPEND', 'ignoreUnknownValues' => @ignore_unknown_values } } } insert_result = @client.execute(:api_method => @bq.jobs.insert, :body_object => body, :parameters => { 'uploadType' => 'multipart', 'projectId' => @project_id }, :media => media) job_id = LogStash::Json.load(insert_result.response.body)["jobReference"]["jobId"] @logger.debug("BQ: multipart insert", :job_id => job_id) return job_id rescue => e @logger.error("BQ: failed to upload file", :exception => e) # TODO(rdc): limit retries? sleep 1 retry end end end ## # Wrapper class that abstracts which IO being used (for instance, regular # files or GzipWriter. # # Inspired by lib/logstash/outputs/file.rb. class IOWriter def initialize(io) @io = io end def write(*args) @io.write(*args) end def flush @io.flush end def method_missing(method_name, *args, &block) if @io.respond_to?(method_name) @io.send(method_name, *args, &block) else super end end attr_accessor :active end