lib/logstash/outputs/elasticsearch.rb in logstash-lite-0.2.20110206003603 vs lib/logstash/outputs/elasticsearch.rb in logstash-lite-0.2.20110329105411

- old
+ new

@@ -1,9 +1,10 @@ require "em-http-request" require "logstash/namespace" require "logstash/outputs/amqp" require "logstash/outputs/base" +require "cgi" class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base public def register @pending = [] @@ -39,60 +40,83 @@ }, # "properties" }, # index map for this index type. }, # "settings" } # ES Index + #puts :waiting + puts @esurl.to_s + #sleep 10 indexurl = @esurl.to_s indexmap_http = EventMachine::HttpRequest.new(indexurl) indexmap_req = indexmap_http.put :body => indexmap.to_json indexmap_req.callback do @logger.info(["Done configuring index", indexurl, indexmap]) ready(params) end indexmap_req.errback do - @logger.warn(["Failure configuring index", @esurl.to_s, indexmap]) + @logger.warn(["Failure configuring index (http failed to connect?)", + @esurl.to_s, indexmap]) + @logger.warn([indexmap_req]) + #sleep 30 raise "Failure configuring index: #{@esurl.to_s}" + end end # def register public def ready(params) - case params["method"] + method = params.delete("method") + case method when "http" @logger.debug "ElasticSearch using http with URL #{@url.to_s}" @http = EventMachine::HttpRequest.new(@url.to_s) @callback = self.method(:receive_http) when "river" - params["port"] ||= 5672 - auth = "#{params["user"] or "guest"}:#{params["pass"] or "guest"}" - mq_url = URI::parse("amqp://#{auth}@#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1") + river_type = params.delete("type") || "rabbitmq" + amqp_host = params.delete("host") || 'localhost' + amqp_port = params.delete("port") || 5672 + amqp_exchange_type = params.delete("exchange_type") || "direct" + amqp_queue_name = params.delete("queue") || "es" + amqp_exchange_name = params.delete("exchange") || amqp_queue_name + amqp_exchange_durable = (params["durable"] || "false") =~ /^[ty1]/ + amqp_user = params.delete("user") or "guest" + amqp_pass = params.delete("pass") or "guest" + amqp_vhost = params.delete("vhost") || "/" + vhost_str = (amqp_vhost == "/") ? "" : "/#{amqp_vhost}" + qs = params.map {|k,v| "#{CGI.escape(k)}=#{CGI.escape(v)}"}.join("&") + mq_url = URI::parse("amqp://#{amqp_user}:#{amqp_pass}@#{amqp_host}:#{amqp_port}#{vhost_str}/#{amqp_exchange_type}/#{amqp_exchange_name}?#{qs}") @mq = LogStash::Outputs::Amqp.new(mq_url.to_s) @mq.register @callback = self.method(:receive_river) em_url = URI.parse("http://#{@url.host}:#{@url.port}/_river/logstash#{@url.path.tr("/", "_")}/_meta") unused, @es_index, @es_type = @url.path.split("/", 3) - river_config = {"type" => params["type"], - params["type"] => {"host" => params["host"], - "user" => params["user"], - "port" => params["port"], - "pass" => params["pass"], - "vhost" => params["vhost"], - "queue" => params["queue"], - "exchange" => params["queue"], - }, - "index" => {"bulk_size" => 100, - "bulk_timeout" => "10ms", - }, - } + river_config = { + "type" => river_type, + river_type => { + "host" => amqp_host, + "user" => amqp_user, + "port" => amqp_port, + "pass" => amqp_pass, + "vhost" => amqp_vhost, + "queue" => amqp_queue_name, + "exchange" => amqp_exchange_name, + "exchange_durable" => amqp_exchange_durable ? "true" : "false", + "exchange_type" => amqp_exchange_type, + }, + "index" => { + "bulk_size" => 100, + "bulk_timeout" => "10ms", + }, + } @logger.debug(["ElasticSearch using river", river_config]) http_setup = EventMachine::HttpRequest.new(em_url.to_s) req = http_setup.put :body => river_config.to_json req.errback do @logger.warn "Error setting up river: #{req.response}" end @callback = self.method(:receive_river) - else raise "unknown elasticsearch method #{params["method"].inspect}" + else raise "unknown elasticsearch method #{method.inspect}" end #receive(LogStash::Event.new({ #"@source" => "@logstashinit", #"@type" => "@none",