lib/elasticsearch/extensions/test/cluster.rb in elasticsearch-extensions-0.0.18 vs lib/elasticsearch/extensions/test/cluster.rb in elasticsearch-extensions-0.0.19
- old
+ new
@@ -1,8 +1,9 @@
require 'timeout'
require 'net/http'
require 'fileutils'
+require 'socket'
require 'uri'
require 'json'
require 'ansi'
STDOUT.sync = true
@@ -29,24 +30,33 @@
#
# @see Cluster#start Cluster.start
# @see Cluster#stop Cluster.stop
#
module Cluster
+ @@network_host = ENV.fetch('TEST_CLUSTER_NETWORK_HOST', 'localhost')
@@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || 2).to_i
+ @@default_cluster_name = "elasticsearch-test-#{Socket.gethostname.downcase}"
# Starts a cluster
#
# Launches the specified number of nodes in test-suitable configuration by default
# and prints information about the cluster -- unless this specific cluster is running already.
#
# Use the {Cluster#stop Cluster.stop} command with the same arguments to stop this cluster.
#
- # @option arguments [String] :command Elasticsearch command (default: `elasticsearch`).
- # @option arguments [Integer] :nodes Number of desired nodes (default: 2).
- # @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`).
- # @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250).
- # @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30).
+ # @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`)
+ # @option arguments [Integer] :nodes Number of desired nodes (default: 2)
+ # @option arguments [String] :command Elasticsearch command (default: `elasticsearch`)
+ # @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250)
+ # @option arguments [String] :node_name The node name (will be appended with a number)
+ # @option arguments [String] :path_data Path to the directory to store data in
+ # @option arguments [String] :path_work Path to the directory with auxiliary files
+ # @option arguments [String] :path_logs Path to the directory with log files
+ # @option arguments [Boolean] :multicast_enabled Whether multicast is enabled (default: true)
+ # @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30)
+ # @option arguments [String] :network_host The host that nodes will bind on and publish to
+ # @option arguments [Boolean] :clear Wipe out cluster content on startup (default: true)
#
# You can also use environment variables to set these options.
#
# @example Start a cluster with default configuration (2 nodes, in-memory, etc)
# Elasticsearch::Extensions::Test::Cluster.start
@@ -64,20 +74,24 @@
#
# @return Boolean
# @see Cluster#stop Cluster.stop
#
def start(arguments={})
- @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || arguments[:nodes] || 2).to_i
+ @@number_of_nodes = ( ENV.fetch('TEST_CLUSTER_NODES', arguments[:nodes] || 2) ).to_i
- arguments[:command] ||= ENV['TEST_CLUSTER_COMMAND'] || 'elasticsearch'
- arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
- arguments[:cluster_name] ||= (ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test').chomp
- arguments[:path_data] ||= ENV['TEST_CLUSTER_DATA'] || '/tmp'
- arguments[:es_params] ||= ENV['TEST_CLUSTER_PARAMS'] || ''
- arguments[:path_work] ||= '/tmp'
- arguments[:node_name] ||= 'node'
- arguments[:timeout] ||= (ENV['TEST_CLUSTER_TIMEOUT'] || 30).to_i
+ arguments[:command] ||= ENV.fetch('TEST_CLUSTER_COMMAND', 'elasticsearch')
+ arguments[:port] ||= (ENV.fetch('TEST_CLUSTER_PORT', 9250).to_i)
+ arguments[:cluster_name] ||= (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp)
+ arguments[:node_name] ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node')
+ arguments[:path_data] ||= ENV.fetch('TEST_CLUSTER_DATA', '/tmp/elasticsearch_test')
+ arguments[:path_work] ||= ENV.fetch('TEST_CLUSTER_TMP', '/tmp')
+ arguments[:path_logs] ||= ENV.fetch('TEST_CLUSTER_LOGS', '/var/log/elasticsearch')
+ arguments[:es_params] ||= ENV.fetch('TEST_CLUSTER_PARAMS', '')
+ arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true')
+ arguments[:timeout] ||= (ENV.fetch('TEST_CLUSTER_TIMEOUT', 30).to_i)
+ arguments[:network_host] ||= @@network_host
+ arguments[:clear] ||= true
# Make sure `cluster_name` is not dangerous
if arguments[:cluster_name] =~ /^[\/\\]?$/
raise ArgumentError, "The `cluster_name` parameter cannot be empty string or a slash"
end
@@ -86,39 +100,47 @@
print "[!] Elasticsearch cluster already running".ansi(:red)
wait_for_green(arguments[:port], arguments[:timeout])
return false
end
- # Wipe out data for this cluster name
- FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}"
+ # Wipe out data for this cluster name if requested
+ FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" if arguments[:clear]
print "Starting ".ansi(:faint) +
@@number_of_nodes.to_s.ansi(:bold, :faint) +
" Elasticsearch nodes..".ansi(:faint)
pids = []
@@number_of_nodes.times do |n|
n += 1
- pid = Process.spawn <<-COMMAND
+ command = <<-COMMAND
#{arguments[:command]} \
-D es.foreground=yes \
-D es.cluster.name=#{arguments[:cluster_name]} \
-D es.node.name=#{arguments[:node_name]}-#{n} \
-D es.http.port=#{arguments[:port].to_i + (n-1)} \
-D es.path.data=#{arguments[:path_data]} \
-D es.path.work=#{arguments[:path_work]} \
+ -D es.path.logs=#{arguments[:path_logs]} \
-D es.cluster.routing.allocation.disk.threshold_enabled=false \
- -D es.network.host=0.0.0.0 \
- -D es.discovery.zen.ping.multicast.enabled=true \
- -D es.script.disable_dynamic=false \
+ -D es.network.host=#{@@network_host} \
+ -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \
+ -D es.script.inline=on \
+ -D es.script.indexed=on \
-D es.node.test=true \
+ -D es.node.testattr=test \
-D es.node.bench=true \
+ -D es.path.repo=/tmp \
+ -D es.repositories.url.allowed_urls=http://snapshot.test* \
-D es.logger.level=DEBUG \
#{arguments[:es_params]} \
> /dev/null
COMMAND
+ STDERR.puts command.gsub(/ {1,}/, ' ') if ENV['DEBUG']
+
+ pid = Process.spawn(command)
Process.detach pid
pids << pid
end
# Check for proceses running
@@ -144,13 +166,14 @@
# @return Boolean
# @see Cluster#start Cluster.start
#
def stop(arguments={})
arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
+ arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', @@network_host)
nodes = begin
- JSON.parse(Net::HTTP.get(URI("http://localhost:#{arguments[:port]}/_nodes/?process")))
+ JSON.parse(Net::HTTP.get(URI("http://#{arguments[:network_host]}:#{arguments[:port]}/_nodes/?process")))
rescue Exception => e
STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red)
nil
end
@@ -159,14 +182,29 @@
pids = nodes['nodes'].map { |id, info| info['process']['id'] }
unless pids.empty?
print "\nStopping Elasticsearch nodes... ".ansi(:faint)
pids.each_with_index do |pid, i|
- begin
- print "stopped PID #{pid}. ".ansi(:green) if Process.kill 'INT', pid
- rescue Exception => e
- print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
+ ['INT','KILL'].each do |signal|
+ begin
+ Process.kill signal, pid
+ rescue Exception => e
+ print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
+ end
+
+ # Give the system some breathing space to finish...
+ sleep 1
+
+ # Check that pid really is dead
+ begin
+ Process.getpgid( pid )
+ # `getpgid` will raise error if pid is dead, so if we get here, try next signal.
+ next
+ rescue Errno::ESRCH
+ print "stopped PID #{pid} with #{signal} signal. ".ansi(:green)
+ break # pid is dead
+ end
end
end
puts
else
false
@@ -182,11 +220,11 @@
#
# @return Boolean
#
def running?(arguments={})
port = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
- cluster_name = arguments[:as] || ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test'
+ cluster_name = arguments[:as] || (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp)
if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil
return cluster_health['cluster_name'] == cluster_name && \
cluster_health['number_of_nodes'] == @@number_of_nodes
end
@@ -217,22 +255,22 @@
# @api private
#
# @return Boolean
#
def __wait_for_status(status='green', port=9250, timeout=30)
- uri = URI("http://localhost:#{port}/_cluster/health?wait_for_status=#{status}")
+ uri = URI("http://#{@@network_host}:#{port}/_cluster/health?wait_for_status=#{status}")
Timeout::timeout(timeout) do
loop do
response = begin
JSON.parse(Net::HTTP.get(uri))
rescue Exception => e
- puts e.inspect if ENV['DEBUG']
+ STDERR.puts e.inspect if ENV['DEBUG']
nil
end
- puts response.inspect if ENV['DEBUG']
+ STDERR.puts response.inspect if response && ENV['DEBUG']
if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i )
__print_cluster_info(port) and break
end
@@ -247,13 +285,13 @@
# Print information about the cluster on STDOUT
#
# @api private
#
def __print_cluster_info(port)
- health = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/health")))
- nodes = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_nodes/process,http")))
- master = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/state")))['master_node']
+ health = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/health")))
+ nodes = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_nodes/process,http")))
+ master = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/state")))['master_node']
puts "\n",
('-'*80).ansi(:faint),
'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
'Status: '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
@@ -273,10 +311,10 @@
# Tries to load cluster health information
#
# @api private
#
def __get_cluster_health(port=9250)
- uri = URI("http://localhost:#{port}/_cluster/health")
+ uri = URI("http://#{@@network_host}:#{port}/_cluster/health")
if response = Net::HTTP.get(uri) rescue nil
return JSON.parse(response)
end
end