lib/fluent/plugin/out_couch.rb in fluent-plugin-couch-0.3.6 vs lib/fluent/plugin/out_couch.rb in fluent-plugin-couch-0.4.0
- old
+ new
@@ -1,76 +1,97 @@
module Fluent
class CouchOutput < BufferedOutput
include SetTagKeyMixin
config_set_default :include_tag_key, false
-
+
include SetTimeKeyMixin
config_set_default :include_time_key, true
-
+
Fluent::Plugin.register_output('couch', self)
-
+
config_param :database, :string
-
+
config_param :host, :string, :default => 'localhost'
config_param :port, :string, :default => '5984'
config_param :protocol, :string, :default => 'http'
-
+
config_param :refresh_view_index , :string, :default => nil
config_param :user, :string, :default => nil
config_param :password, :string, :default => nil
-
+
+ config_param :update_docs, :bool, :default => false
+ config_param :doc_key_field, :string, :default => nil
+
def initialize
super
require 'msgpack'
Encoding.default_internal = 'UTF-8'
require 'couchrest'
Encoding.default_internal = 'ASCII-8BIT'
end
-
+
def configure(conf)
super
end
-
+
def start
super
- if @user && @password
- @db = CouchRest.database!("#{@protocol}://#{@user}:#{@password}@#{@host}:#{@port}/#{@database}")
- else
- @db = CouchRest.database!("#{@protocol}://#{@host}:#{@port}/#{@database}")
- end
+ account = "#{@user}:#{@password}@" if @user && @password
+ @db = CouchRest.database!("#{@protocol}://#{account}#{@host}:#{@port}/#{@database}")
@views = []
if @refresh_view_index
begin
@db.get("_design/#{@refresh_view_index}")['views'].each do |view_name,func|
@views.push([@refresh_view_index,view_name])
end
- rescue
+ rescue
puts 'design document not found!'
end
end
end
-
+
def shutdown
super
end
-
+
def format(tag, time, record)
record.to_msgpack
end
-
+
def write(chunk)
records = []
- chunk.msgpack_each {|record| records << record }
- @db.bulk_save(records)
- update_view_index()
+ chunk.msgpack_each {|record|
+ record['_id'] = record[@doc_key_field] unless @doc_key_field.nil?
+ records << record
+ }
+ unless @update_docs
+ @db.bulk_save(records)
+ else
+ update_docs(records)
+ end
+ update_view_index
end
-
+
+ def update_docs(records)
+ if records.length > 0
+ records.each{|record|
+ doc = nil
+ begin
+ doc = @db.get(record['_id'])
+ rescue
+ end
+ record['_rev']=doc['_rev'] unless doc.nil?
+ puts record
+ @db.save_doc(record)
+ }
+ end
+ end
+
def update_view_index()
@views.each do |design,view|
@db.view("#{design}/#{view}",{"limit"=>"0"})
end
end
-
end
end