lib/logstash/outputs/elasticsearch.rb in logstash-lite-0.2.20110206003603 vs lib/logstash/outputs/elasticsearch.rb in logstash-lite-0.2.20110329105411
- old
+ new
@@ -1,9 +1,10 @@
require "em-http-request"
require "logstash/namespace"
require "logstash/outputs/amqp"
require "logstash/outputs/base"
+require "cgi"
class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
public
def register
@pending = []
@@ -39,60 +40,83 @@
}, # "properties"
}, # index map for this index type.
}, # "settings"
} # ES Index
+ #puts :waiting
+ puts @esurl.to_s
+ #sleep 10
indexurl = @esurl.to_s
indexmap_http = EventMachine::HttpRequest.new(indexurl)
indexmap_req = indexmap_http.put :body => indexmap.to_json
indexmap_req.callback do
@logger.info(["Done configuring index", indexurl, indexmap])
ready(params)
end
indexmap_req.errback do
- @logger.warn(["Failure configuring index", @esurl.to_s, indexmap])
+ @logger.warn(["Failure configuring index (http failed to connect?)",
+ @esurl.to_s, indexmap])
+ @logger.warn([indexmap_req])
+ #sleep 30
raise "Failure configuring index: #{@esurl.to_s}"
+
end
end # def register
public
def ready(params)
- case params["method"]
+ method = params.delete("method")
+ case method
when "http"
@logger.debug "ElasticSearch using http with URL #{@url.to_s}"
@http = EventMachine::HttpRequest.new(@url.to_s)
@callback = self.method(:receive_http)
when "river"
- params["port"] ||= 5672
- auth = "#{params["user"] or "guest"}:#{params["pass"] or "guest"}"
- mq_url = URI::parse("amqp://#{auth}@#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1")
+ river_type = params.delete("type") || "rabbitmq"
+ amqp_host = params.delete("host") || 'localhost'
+ amqp_port = params.delete("port") || 5672
+ amqp_exchange_type = params.delete("exchange_type") || "direct"
+ amqp_queue_name = params.delete("queue") || "es"
+ amqp_exchange_name = params.delete("exchange") || amqp_queue_name
+ amqp_exchange_durable = (params["durable"] || "false") =~ /^[ty1]/
+ amqp_user = params.delete("user") or "guest"
+ amqp_pass = params.delete("pass") or "guest"
+ amqp_vhost = params.delete("vhost") || "/"
+ vhost_str = (amqp_vhost == "/") ? "" : "/#{amqp_vhost}"
+ qs = params.map {|k,v| "#{CGI.escape(k)}=#{CGI.escape(v)}"}.join("&")
+ mq_url = URI::parse("amqp://#{amqp_user}:#{amqp_pass}@#{amqp_host}:#{amqp_port}#{vhost_str}/#{amqp_exchange_type}/#{amqp_exchange_name}?#{qs}")
@mq = LogStash::Outputs::Amqp.new(mq_url.to_s)
@mq.register
@callback = self.method(:receive_river)
em_url = URI.parse("http://#{@url.host}:#{@url.port}/_river/logstash#{@url.path.tr("/", "_")}/_meta")
unused, @es_index, @es_type = @url.path.split("/", 3)
- river_config = {"type" => params["type"],
- params["type"] => {"host" => params["host"],
- "user" => params["user"],
- "port" => params["port"],
- "pass" => params["pass"],
- "vhost" => params["vhost"],
- "queue" => params["queue"],
- "exchange" => params["queue"],
- },
- "index" => {"bulk_size" => 100,
- "bulk_timeout" => "10ms",
- },
- }
+ river_config = {
+ "type" => river_type,
+ river_type => {
+ "host" => amqp_host,
+ "user" => amqp_user,
+ "port" => amqp_port,
+ "pass" => amqp_pass,
+ "vhost" => amqp_vhost,
+ "queue" => amqp_queue_name,
+ "exchange" => amqp_exchange_name,
+ "exchange_durable" => amqp_exchange_durable ? "true" : "false",
+ "exchange_type" => amqp_exchange_type,
+ },
+ "index" => {
+ "bulk_size" => 100,
+ "bulk_timeout" => "10ms",
+ },
+ }
@logger.debug(["ElasticSearch using river", river_config])
http_setup = EventMachine::HttpRequest.new(em_url.to_s)
req = http_setup.put :body => river_config.to_json
req.errback do
@logger.warn "Error setting up river: #{req.response}"
end
@callback = self.method(:receive_river)
- else raise "unknown elasticsearch method #{params["method"].inspect}"
+ else raise "unknown elasticsearch method #{method.inspect}"
end
#receive(LogStash::Event.new({
#"@source" => "@logstashinit",
#"@type" => "@none",