lib/fluent/plugin/out_couch.rb in fluent-plugin-couch-0.2.0 vs lib/fluent/plugin/out_couch.rb in fluent-plugin-couch-0.3.0
- old
+ new
@@ -1,75 +1,42 @@
-
-module Couch
- require 'net/http'
- class Server
- def initialize(host, port, options = nil)
- @host = host
- @port = port
- @options = options
- end
-
- def delete(uri)
- request(Net::HTTP::Delete.new(uri))
- end
-
- def get(uri)
- request(Net::HTTP::Get.new(uri))
- end
-
- def put(uri, json)
- req = Net::HTTP::Put.new(uri)
- req["content-type"] = "application/json"
- req.body = json
- request(req)
- end
-
- def post(uri, json)
- req = Net::HTTP::Post.new(uri)
- req["content-type"] = "application/json"
- req.body = json
- request(req)
- end
-
- def request(req)
- res = Net::HTTP.start(@host, @port) { |http|http.request(req) }
- res
- end
-
- end
-end
-
module Fluent
class CouchOutput < BufferedOutput
-
include SetTagKeyMixin
config_set_default :include_tag_key, false
include SetTimeKeyMixin
config_set_default :include_time_key, true
Fluent::Plugin.register_output('couch', self)
-
- config_param :database, :string => nil do |val|
- '/'+val
- end
+
+ config_param :database, :string
+
config_param :host, :string, :default => 'localhost'
config_param :port, :string, :default => '5984'
+ config_param :refresh_view_index , :string, :default => nil
+
def initialize
super
+ Encoding.default_internal = 'UTF-8'
require 'msgpack'
+ require 'couchrest'
end
def configure(conf)
super
end
def start
super
- @couch = Couch::Server.new(@host, @port)
- @couch.put(@database, "")
+ @db = CouchRest.database("http://#{@host}:#{@port}/#{@database}")
+ @views = []
+ unless @refresh_view_index.nil?
+ @db.get("_design/#{@refresh_view_index}")['views'].each do |view_name,func|
+ @views.push([@refresh_view_index,view_name])
+ end
+ end
end
def shutdown
super
end
@@ -79,13 +46,17 @@
end
def write(chunk)
records = []
chunk.msgpack_each {|record| records << record }
- @couch.post(@database+'/_bulk_docs', {"all_or_nothing"=>true, "docs"=>records}.to_json)
+ @db.bulk_save(records)
+ update_view_index()
+ end
- #for record in records
- # @couch.post(@database,record.to_json)
- #end
+ def update_view_index()
+ @views.each do |design,view|
+ @db.view("#{design}/#{view}",{"limit"=>"0"})
+ end
end
+
end
end