require "logstash/outputs/base" require "logstash/outputs/amqp" require "em-http-request" class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base def initialize(url, config={}, &block) super end def register # Port? # Authentication? @httpurl = @url.clone @httpurl.scheme = "http" defaults = {"method" => "http"} params = defaults.merge(@urlopts) case params["method"] when "http" @logger.debug "ElasticSearch using http with URL #{@httpurl.to_s}" @http = EventMachine::HttpRequest.new(@httpurl.to_s) @callback = self.method(:receive_http) when "river" mq_url = URI::parse("amqp://#{params["host"]}/queue/#{params["queue"]}?durable=1") @mq = LogStash::Outputs::Amqp.new(mq_url.to_s) @mq.register @callback = self.method(:receive_river) em_url = URI.parse("http://#{@httpurl.host}:#{@httpurl.port}/_river/logstash#{@httpurl.path.tr("/", "_")}/_meta") unused, @es_index, @es_type = @httpurl.path.split("/", 3) river_config = {"type" => params["type"], params["type"] => {"host" => params["host"], "user" => params["user"], "pass" => params["pass"], "vhost" => params["vhost"], "queue" => params["queue"], "exchange" => params["queue"], }, "index" => {"bulk_size" => 100, "bulk_timeout" => "10ms", }, } @logger.debug(["ElasticSearch using river", river_config]) http_setup = EventMachine::HttpRequest.new(em_url.to_s) req = http_setup.put :body => river_config.to_json req.errback do @logger.warn "Error setting up river: #{req.response}" end else raise "unknown elasticsearch method #{params["method"].inspect}" end end # def register def receive(event) @callback.call(event) end # def receive def receive_http(event) req = @http.post :body => event.to_json req.errback do $stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}" end end # def receive_http def receive_river(event) # bulk format; see http://www.elasticsearch.com/docs/elasticsearch/river/rabbitmq/ index_message = {"index" => {"_index" => @es_index, "_type" => @es_type}}.to_json + "\n" #index_message += {@es_type => event.to_hash}.to_json + "\n" index_message += event.to_hash.to_json + "\n" @mq.receive_raw(index_message) end # def receive_river end # class LogStash::Outputs::Websocket