lib/logstash/outputs/elasticsearch/protocol.rb in logstash-output-elasticsearch-0.1.1 vs lib/logstash/outputs/elasticsearch/protocol.rb in logstash-output-elasticsearch-0.1.5
- old
+ new
@@ -49,87 +49,46 @@
DEFAULT_OPTIONS = {
:port => 9200
}
def initialize(options={})
- require "ftw"
super
require "elasticsearch" # gem 'elasticsearch-ruby'
+ # manticore http transport
+ require "elasticsearch/transport/transport/http/manticore"
@options = DEFAULT_OPTIONS.merge(options)
@client = client
end
def build_client(options)
- client = Elasticsearch::Client.new(
- :host => [options[:host], options[:port]].join(":")
- )
+ uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}"
- # Use FTW to do indexing requests, for now, until we
- # can identify and resolve performance problems of elasticsearch-ruby
- @bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk"
- @agent = FTW::Agent.new
+ client_options = {
+ :host => [uri],
+ :transport_options => options[:client_settings]
+ }
+ client_options[:transport_class] = ::Elasticsearch::Transport::Transport::HTTP::Manticore
+ client_options[:ssl] = client_options[:transport_options].delete(:ssl)
- return client
- end
-
- if ENV["BULK"] == "esruby"
- def bulk(actions)
- bulk_esruby(actions)
+ if options[:user] && options[:password] then
+ token = Base64.strict_encode64(options[:user] + ":" + options[:password])
+ client_options[:headers] = { "Authorization" => "Basic #{token}" }
end
- else
- def bulk(actions)
- bulk_ftw(actions)
- end
+
+ Elasticsearch::Client.new client_options
end
- def bulk_esruby(actions)
+ def bulk(actions)
@client.bulk(:body => actions.collect do |action, args, source|
if source
next [ { action => args }, source ]
else
next { action => args }
end
end.flatten)
- end # def bulk_esruby
+ end # def bulk
- # Avoid creating a new string for newline every time
- NEWLINE = "\n".freeze
- def bulk_ftw(actions)
- body = actions.collect do |action, args, source|
- header = { action => args }
- if source
- next [ LogStash::Json.dump(header), NEWLINE, LogStash::Json.dump(source), NEWLINE ]
- else
- next [ LogStash::Json.dump(header), NEWLINE ]
- end
- end.flatten.join("")
- begin
- response = @agent.post!(@bulk_url, :body => body)
- rescue EOFError
- @logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port)
- raise
- end
-
- # Consume the body for error checking
- # This will also free up the connection for reuse.
- response_body = ""
- begin
- response.read_body { |chunk| response_body += chunk }
- rescue EOFError
- @logger.warn("EOF while reading response body from elasticsearch",
- :url => @bulk_url)
- raise
- end
-
- if response.status != 200
- @logger.error("Error writing (bulk) to elasticsearch",
- :response => response, :response_body => response_body,
- :request_body => body)
- raise "Non-OK response code from Elasticsearch: #{response.status}"
- end
- end # def bulk_ftw
-
def template_exists?(name)
@client.indices.get_template(:name => name)
return true
rescue Elasticsearch::Transport::Transport::Errors::NotFound
return false
@@ -179,20 +138,44 @@
return @settings
end
def hosts(options)
- if options[:port].to_s =~ /^\d+-\d+$/
- # port ranges are 'host[port1-port2]' according to
- # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
- # However, it seems to only query the first port.
- # So generate our own list of unicast hosts to scan.
- range = Range.new(*options[:port].split("-"))
- return range.collect { |p| "#{options[:host]}:#{p}" }.join(",")
+ # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
+ result = Array.new
+ if options[:host].class == Array
+ options[:host].each do |host|
+ if host.to_s =~ /^.+:.+$/
+ # For host in format: host:port, ignore options[:port]
+ result << host
+ else
+ if options[:port].to_s =~ /^\d+-\d+$/
+ # port ranges are 'host[port1-port2]'
+ result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" }
+ else
+ result << "#{host}:#{options[:port]}"
+ end
+ end
+ end
else
- return "#{options[:host]}:#{options[:port]}"
+ if options[:host].to_s =~ /^.+:.+$/
+ # For host in format: host:port, ignore options[:port]
+ result << options[:host]
+ else
+ if options[:port].to_s =~ /^\d+-\d+$/
+ # port ranges are 'host[port1-port2]' according to
+ # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
+ # However, it seems to only query the first port.
+ # So generate our own list of unicast hosts to scan.
+ range = Range.new(*options[:port].split("-"))
+ result << range.collect { |p| "#{options[:host]}:#{p}" }
+ else
+ result << "#{options[:host]}:#{options[:port]}"
+ end
+ end
end
+ result.flatten.join(",")
end # def hosts
def build_client(options)
nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder
return nodebuilder.settings(@settings).node.client
@@ -266,6 +249,5 @@
class Bulk; end
class Index; end
class Delete; end
end
end
-