lib/logstash/outputs/elasticsearch.rb in logstash-lite-0.2.20101222161646 vs lib/logstash/outputs/elasticsearch.rb in logstash-lite-0.2.20110112115019

- old
+ new

@@ -1,38 +1,80 @@ -require "logstash/outputs/base" -require "logstash/outputs/amqp" require "em-http-request" +require "logstash/namespace" +require "logstash/outputs/amqp" +require "logstash/outputs/base" class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base - def initialize(url, config={}, &block) - super - end - + public def register + @pending = [] # Port? # Authentication? - @httpurl = @url.clone - @httpurl.scheme = "http" + @esurl = @url.clone + @esurl.scheme = "http" + @esurl.path = "/" + @url.path.split("/")[1] defaults = {"method" => "http"} params = defaults.merge(@urlopts) + # Describe this index to elasticsearch + indexmap = { + # The name of the index + "settings" => { + @url.path.split("/")[-1] => { + "mappings" => { + "@source" => { "type" => "string" }, + "@source_host" => { "type" => "string" }, + "@source_path" => { "type" => "string" }, + "@timestamp" => { "type" => "date" }, + "@tags" => { "type" => "string" }, + "@message" => { "type" => "string" }, + + # TODO(sissel): Hack for now until this bug is resolved: + # https://github.com/elasticsearch/elasticsearch/issues/issue/604 + "@fields" => { + "type" => "object", + "properties" => { + "HOSTNAME" => { "type" => "string" }, + }, + }, # "@fields" + }, # "properties" + }, # index map for this index type. + }, # "settings" + } # ES Index + + indexurl = @esurl.to_s + indexmap_http = EventMachine::HttpRequest.new(indexurl) + indexmap_req = indexmap_http.put :body => indexmap.to_json + indexmap_req.callback do + @logger.info(["Done configuring index", indexurl, indexmap]) + ready(params) + end + indexmap_req.errback do + @logger.warn(["Failure configuring index", @esurl.to_s, indexmap]) + end + end # def register + + public + def ready(params) case params["method"] when "http" - @logger.debug "ElasticSearch using http with URL #{@httpurl.to_s}" - @http = EventMachine::HttpRequest.new(@httpurl.to_s) + @logger.debug "ElasticSearch using http with URL #{@url.to_s}" + @http = EventMachine::HttpRequest.new(@url.to_s) @callback = self.method(:receive_http) when "river" - mq_url = URI::parse("amqp://#{params["host"]}/queue/#{params["queue"]}?durable=1") + params["port"] ||= 5672 + mq_url = URI::parse("amqp://#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1") @mq = LogStash::Outputs::Amqp.new(mq_url.to_s) @mq.register @callback = self.method(:receive_river) - em_url = URI.parse("http://#{@httpurl.host}:#{@httpurl.port}/_river/logstash#{@httpurl.path.tr("/", "_")}/_meta") - unused, @es_index, @es_type = @httpurl.path.split("/", 3) + em_url = URI.parse("http://#{@url.host}:#{@url.port}/_river/logstash#{@url.path.tr("/", "_")}/_meta") + unused, @es_index, @es_type = @url.path.split("/", 3) river_config = {"type" => params["type"], params["type"] => {"host" => params["host"], "user" => params["user"], + "port" => params["port"], "pass" => params["pass"], "vhost" => params["vhost"], "queue" => params["queue"], "exchange" => params["queue"], }, @@ -44,28 +86,54 @@ http_setup = EventMachine::HttpRequest.new(em_url.to_s) req = http_setup.put :body => river_config.to_json req.errback do @logger.warn "Error setting up river: #{req.response}" end + @callback = self.method(:receive_river) else raise "unknown elasticsearch method #{params["method"].inspect}" end - end # def register + receive(LogStash::Event.new({ + "@source" => "@logstashinit", + "@type" => "@none", + "@message" => "Starting logstash output to elasticsearch", + "@fields" => { + "HOSTNAME" => Socket.gethostname + }, + })) + + pending = @pending + @pending = [] + pending.each do |event| + receive(event) + end + end # def ready + + public def receive(event) - @callback.call(event) + if @callback + @callback.call(event) + else + @pending << event + end end # def receive - def receive_http(event) + public + def receive_http(event, tries=5) req = @http.post :body => event.to_json req.errback do - $stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}" + $stderr.puts "Request to index to #{@url.to_s} failed (will retry, #{tries} tries left). Event was #{event.to_s}" + EventMachine::add_timer(2) do + receive_http(event, tries - 1) + end end end # def receive_http + public def receive_river(event) # bulk format; see http://www.elasticsearch.com/docs/elasticsearch/river/rabbitmq/ index_message = {"index" => {"_index" => @es_index, "_type" => @es_type}}.to_json + "\n" #index_message += {@es_type => event.to_hash}.to_json + "\n" index_message += event.to_hash.to_json + "\n" @mq.receive_raw(index_message) end # def receive_river -end # class LogStash::Outputs::Websocket +end # class LogStash::Outputs::Elasticsearch