lib/logstash/outputs/elasticsearch/protocol.rb in logstash-output-elasticsearch-0.1.1 vs lib/logstash/outputs/elasticsearch/protocol.rb in logstash-output-elasticsearch-0.1.5

- old
+ new

@@ -49,87 +49,46 @@ DEFAULT_OPTIONS = { :port => 9200 } def initialize(options={}) - require "ftw" super require "elasticsearch" # gem 'elasticsearch-ruby' + # manticore http transport + require "elasticsearch/transport/transport/http/manticore" @options = DEFAULT_OPTIONS.merge(options) @client = client end def build_client(options) - client = Elasticsearch::Client.new( - :host => [options[:host], options[:port]].join(":") - ) + uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}" - # Use FTW to do indexing requests, for now, until we - # can identify and resolve performance problems of elasticsearch-ruby - @bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk" - @agent = FTW::Agent.new + client_options = { + :host => [uri], + :transport_options => options[:client_settings] + } + client_options[:transport_class] = ::Elasticsearch::Transport::Transport::HTTP::Manticore + client_options[:ssl] = client_options[:transport_options].delete(:ssl) - return client - end - - if ENV["BULK"] == "esruby" - def bulk(actions) - bulk_esruby(actions) + if options[:user] && options[:password] then + token = Base64.strict_encode64(options[:user] + ":" + options[:password]) + client_options[:headers] = { "Authorization" => "Basic #{token}" } end - else - def bulk(actions) - bulk_ftw(actions) - end + + Elasticsearch::Client.new client_options end - def bulk_esruby(actions) + def bulk(actions) @client.bulk(:body => actions.collect do |action, args, source| if source next [ { action => args }, source ] else next { action => args } end end.flatten) - end # def bulk_esruby + end # def bulk - # Avoid creating a new string for newline every time - NEWLINE = "\n".freeze - def bulk_ftw(actions) - body = actions.collect do |action, args, source| - header = { action => args } - if source - next [ LogStash::Json.dump(header), NEWLINE, LogStash::Json.dump(source), NEWLINE ] - else - next [ LogStash::Json.dump(header), NEWLINE ] - end - end.flatten.join("") - begin - response = @agent.post!(@bulk_url, :body => body) - rescue EOFError - @logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port) - raise - end - - # Consume the body for error checking - # This will also free up the connection for reuse. - response_body = "" - begin - response.read_body { |chunk| response_body += chunk } - rescue EOFError - @logger.warn("EOF while reading response body from elasticsearch", - :url => @bulk_url) - raise - end - - if response.status != 200 - @logger.error("Error writing (bulk) to elasticsearch", - :response => response, :response_body => response_body, - :request_body => body) - raise "Non-OK response code from Elasticsearch: #{response.status}" - end - end # def bulk_ftw - def template_exists?(name) @client.indices.get_template(:name => name) return true rescue Elasticsearch::Transport::Transport::Errors::NotFound return false @@ -179,20 +138,44 @@ return @settings end def hosts(options) - if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' according to - # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - # However, it seems to only query the first port. - # So generate our own list of unicast hosts to scan. - range = Range.new(*options[:port].split("-")) - return range.collect { |p| "#{options[:host]}:#{p}" }.join(",") + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + result = Array.new + if options[:host].class == Array + options[:host].each do |host| + if host.to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << host + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' + result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } + else + result << "#{host}:#{options[:port]}" + end + end + end else - return "#{options[:host]}:#{options[:port]}" + if options[:host].to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << options[:host] + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' according to + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + # However, it seems to only query the first port. + # So generate our own list of unicast hosts to scan. + range = Range.new(*options[:port].split("-")) + result << range.collect { |p| "#{options[:host]}:#{p}" } + else + result << "#{options[:host]}:#{options[:port]}" + end + end end + result.flatten.join(",") end # def hosts def build_client(options) nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder return nodebuilder.settings(@settings).node.client @@ -266,6 +249,5 @@ class Bulk; end class Index; end class Delete; end end end -