# encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" require "mongo" require_relative "bson/big_decimal" require_relative "bson/logstash_timestamp" # This output writes events to MongoDB. class LogStash::Outputs::MongodbUpsertCustom < LogStash::Outputs::Base config_name "mongodb_upsert_custom" # A MongoDB URI to connect to. # See http://docs.mongodb.org/manual/reference/connection-string/. config :uri, :validate => :string, :required => true # The database to use. config :database, :validate => :string, :required => true # The collection to use. This value can use `%{foo}` values to dynamically # select a collection based on data in the event. config :collection, :validate => :string, :required => true # If true, store the @timestamp field in MongoDB as an ISODate type instead # of an ISO8601 string. For more information about this, see # http://www.mongodb.org/display/DOCS/Dates. config :isodate, :validate => :boolean, :default => false # The number of seconds to wait after failure before retrying. config :retry_delay, :validate => :number, :default => 3, :required => false # If true, an "_id" field will be added to the document before insertion. # The "_id" field will use the timestamp of the event and overwrite an existing # "_id" field in the event. config :generateId, :validate => :boolean, :default => false # Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one. config :bulk, :validate => :boolean, :default => false # Bulk interval, Used to insert events periodically if the "bulk" flag is activated. config :bulk_interval, :validate => :number, :default => 2 # Bulk events number, if the number of events to insert into a collection raise that limit, it will be bulk inserted # whatever the bulk interval value (mongodb hard limit is 1000). config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2 config :date_keys, :validate => :string, :default => nil config :is_checkpoint, :validate => :boolean, :default => nil config :update_keys, :validate => :string, :default => nil config :filter_key1, :validate => :string, :default => nil config :filter_key2, :validate => :string, :default => nil # Mutex used to synchronize access to 'documents' @@mutex = Mutex.new @@log_type = {"OUTPUT" => "output", "INPUT" => "input"} def register if @bulk_size > 1000 raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'" end Mongo::Logger.logger = @logger # conn = Mongo::Client.new(@uri) # @db = conn.use(@database) @db = Mongo::Client.new(@uri) @closed = Concurrent::AtomicBoolean.new(false) @documents = {} @bulk_thread = Thread.new(@bulk_interval) do |bulk_interval| while @closed.false? do sleep(bulk_interval) @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 # if !@is_checkpoint values.each do |value| criteria = Hash.new criteria_key1 = @filter_key1 criteria[criteria_key1] = value[criteria_key1] criteria_key2 = @filter_key2 criteria[criteria_key2] = value[criteria_key2] if @is_checkpoint if value['type'] == @@log_type['OUTPUT'] || value['type'] == @@log_type['INPUT'] if value['time_stamp'] criteria['time_stamp'] = { '$lte': value["time_stamp"] } end @db[collection].find(criteria).update_many({"$set": value}, :upsert => :true) end else if value['type'] == @@log_type['OUTPUT'] @db[collection].find(criteria).update_one(value, :upsert => :true, :new => :true) end end end @documents.delete(collection) end end end end end end def receive(event) begin # Our timestamp object now has a to_bson method, using it here # {}.merge(other) so we don't taint the event hash innards document = {}.merge(event.to_hash) if !@isodate timestamp = event.timestamp if timestamp # not using timestamp.to_bson document["@timestamp"] = timestamp.to_json else @logger.warn("Cannot set MongoDB document `@timestamp` field because it does not exist in the event", :event => event) end end if @date_keys keys = date_keys.to_s.split(",") document.each do |key, value| if keys.index key document[key] = LogStash::Timestamp.new(value) end end end if @update_keys filterkeys = update_keys.to_s.split(",") @filter_key1 = filterkeys[0] @filter_key2 = filterkeys[1] end if @generateId document["_id"] = BSON::ObjectId.new end collection = event.sprintf(@collection) if @bulk @@mutex.synchronize do if(!@documents[collection]) @documents[collection] = [] end @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) @documents[collection].each do |docRecord| criteria = Hash.new criteria_key1 = @filter_key1 criteria[criteria_key1] = docRecord[criteria_key1] criteria_key2 = @filter_key2 criteria[criteria_key2] = docRecord[criteria_key2] if @is_checkpoint if docRecord['type'] == @@log_type['OUTPUT'] || docRecord['type'] == @@log_type['INPUT'] if docRecord['time_stamp'] criteria['time_stamp'] = { '$lte': docRecord["time_stamp"] } end @db[collection].find(criteria).update_many({"$set": docRecord}, :upsert => :true) end else if docRecord['type'] == @@log_type['OUTPUT'] @db[collection].find(criteria).update_one(docRecord, :upsert => :true, :new => :true) end end end @documents.delete(collection) end end else criteria = Hash.new criteria_key1 = @filter_key1 criteria[criteria_key1] = document[criteria_key1] criteria_key2 = @filter_key2 criteria[criteria_key2] = document[criteria_key2] if @is_checkpoint if document['type'] == @@log_type['OUTPUT'] || document['type'] == @@log_type['INPUT'] if document['time_stamp'] criteria['time_stamp'] = { '$lte': document["time_stamp"] } end @db[collection].find(criteria).update_many({"$set": document}, :upsert => :true) end else if document['type'] == @@log_type['OUTPUT'] @db[collection].find(criteria).update_one(document, :upsert => :true, :new => :true) end end end rescue => e if e.message =~ /^E11000/ # On a duplicate key error, skip the insert. # We could check if the duplicate key err is the _id key # and generate a new primary key. # If the duplicate key error is on another field, we have no way # to fix the issue. @logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e) else @logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e) sleep(@retry_delay) retry end end end def close @closed.make_true @bulk_thread.wakeup @bulk_thread.join end end