lib/fluent/plugin/out_couch.rb in fluent-plugin-couch-0.3.6 vs lib/fluent/plugin/out_couch.rb in fluent-plugin-couch-0.4.0

- old
+ new

@@ -1,76 +1,97 @@ module Fluent class CouchOutput < BufferedOutput include SetTagKeyMixin config_set_default :include_tag_key, false - + include SetTimeKeyMixin config_set_default :include_time_key, true - + Fluent::Plugin.register_output('couch', self) - + config_param :database, :string - + config_param :host, :string, :default => 'localhost' config_param :port, :string, :default => '5984' config_param :protocol, :string, :default => 'http' - + config_param :refresh_view_index , :string, :default => nil config_param :user, :string, :default => nil config_param :password, :string, :default => nil - + + config_param :update_docs, :bool, :default => false + config_param :doc_key_field, :string, :default => nil + def initialize super require 'msgpack' Encoding.default_internal = 'UTF-8' require 'couchrest' Encoding.default_internal = 'ASCII-8BIT' end - + def configure(conf) super end - + def start super - if @user && @password - @db = CouchRest.database!("#{@protocol}://#{@user}:#{@password}@#{@host}:#{@port}/#{@database}") - else - @db = CouchRest.database!("#{@protocol}://#{@host}:#{@port}/#{@database}") - end + account = "#{@user}:#{@password}@" if @user && @password + @db = CouchRest.database!("#{@protocol}://#{account}#{@host}:#{@port}/#{@database}") @views = [] if @refresh_view_index begin @db.get("_design/#{@refresh_view_index}")['views'].each do |view_name,func| @views.push([@refresh_view_index,view_name]) end - rescue + rescue puts 'design document not found!' end end end - + def shutdown super end - + def format(tag, time, record) record.to_msgpack end - + def write(chunk) records = [] - chunk.msgpack_each {|record| records << record } - @db.bulk_save(records) - update_view_index() + chunk.msgpack_each {|record| + record['_id'] = record[@doc_key_field] unless @doc_key_field.nil? + records << record + } + unless @update_docs + @db.bulk_save(records) + else + update_docs(records) + end + update_view_index end - + + def update_docs(records) + if records.length > 0 + records.each{|record| + doc = nil + begin + doc = @db.get(record['_id']) + rescue + end + record['_rev']=doc['_rev'] unless doc.nil? + puts record + @db.save_doc(record) + } + end + end + def update_view_index() @views.each do |design,view| @db.view("#{design}/#{view}",{"limit"=>"0"}) end end - end end