lib/elasticsearch/extensions/test/cluster.rb in elasticsearch-extensions-0.0.18 vs lib/elasticsearch/extensions/test/cluster.rb in elasticsearch-extensions-0.0.19

- old
+ new

@@ -1,8 +1,9 @@ require 'timeout' require 'net/http' require 'fileutils' +require 'socket' require 'uri' require 'json' require 'ansi' STDOUT.sync = true @@ -29,24 +30,33 @@ # # @see Cluster#start Cluster.start # @see Cluster#stop Cluster.stop # module Cluster + @@network_host = ENV.fetch('TEST_CLUSTER_NETWORK_HOST', 'localhost') @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || 2).to_i + @@default_cluster_name = "elasticsearch-test-#{Socket.gethostname.downcase}" # Starts a cluster # # Launches the specified number of nodes in test-suitable configuration by default # and prints information about the cluster -- unless this specific cluster is running already. # # Use the {Cluster#stop Cluster.stop} command with the same arguments to stop this cluster. # - # @option arguments [String] :command Elasticsearch command (default: `elasticsearch`). - # @option arguments [Integer] :nodes Number of desired nodes (default: 2). - # @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`). - # @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250). - # @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30). + # @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`) + # @option arguments [Integer] :nodes Number of desired nodes (default: 2) + # @option arguments [String] :command Elasticsearch command (default: `elasticsearch`) + # @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250) + # @option arguments [String] :node_name The node name (will be appended with a number) + # @option arguments [String] :path_data Path to the directory to store data in + # @option arguments [String] :path_work Path to the directory with auxiliary files + # @option arguments [String] :path_logs Path to the directory with log files + # @option arguments [Boolean] :multicast_enabled Whether multicast is enabled (default: true) + # @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30) + # @option arguments [String] :network_host The host that nodes will bind on and publish to + # @option arguments [Boolean] :clear Wipe out cluster content on startup (default: true) # # You can also use environment variables to set these options. # # @example Start a cluster with default configuration (2 nodes, in-memory, etc) # Elasticsearch::Extensions::Test::Cluster.start @@ -64,20 +74,24 @@ # # @return Boolean # @see Cluster#stop Cluster.stop # def start(arguments={}) - @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || arguments[:nodes] || 2).to_i + @@number_of_nodes = ( ENV.fetch('TEST_CLUSTER_NODES', arguments[:nodes] || 2) ).to_i - arguments[:command] ||= ENV['TEST_CLUSTER_COMMAND'] || 'elasticsearch' - arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i - arguments[:cluster_name] ||= (ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test').chomp - arguments[:path_data] ||= ENV['TEST_CLUSTER_DATA'] || '/tmp' - arguments[:es_params] ||= ENV['TEST_CLUSTER_PARAMS'] || '' - arguments[:path_work] ||= '/tmp' - arguments[:node_name] ||= 'node' - arguments[:timeout] ||= (ENV['TEST_CLUSTER_TIMEOUT'] || 30).to_i + arguments[:command] ||= ENV.fetch('TEST_CLUSTER_COMMAND', 'elasticsearch') + arguments[:port] ||= (ENV.fetch('TEST_CLUSTER_PORT', 9250).to_i) + arguments[:cluster_name] ||= (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp) + arguments[:node_name] ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node') + arguments[:path_data] ||= ENV.fetch('TEST_CLUSTER_DATA', '/tmp/elasticsearch_test') + arguments[:path_work] ||= ENV.fetch('TEST_CLUSTER_TMP', '/tmp') + arguments[:path_logs] ||= ENV.fetch('TEST_CLUSTER_LOGS', '/var/log/elasticsearch') + arguments[:es_params] ||= ENV.fetch('TEST_CLUSTER_PARAMS', '') + arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true') + arguments[:timeout] ||= (ENV.fetch('TEST_CLUSTER_TIMEOUT', 30).to_i) + arguments[:network_host] ||= @@network_host + arguments[:clear] ||= true # Make sure `cluster_name` is not dangerous if arguments[:cluster_name] =~ /^[\/\\]?$/ raise ArgumentError, "The `cluster_name` parameter cannot be empty string or a slash" end @@ -86,39 +100,47 @@ print "[!] Elasticsearch cluster already running".ansi(:red) wait_for_green(arguments[:port], arguments[:timeout]) return false end - # Wipe out data for this cluster name - FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" + # Wipe out data for this cluster name if requested + FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" if arguments[:clear] print "Starting ".ansi(:faint) + @@number_of_nodes.to_s.ansi(:bold, :faint) + " Elasticsearch nodes..".ansi(:faint) pids = [] @@number_of_nodes.times do |n| n += 1 - pid = Process.spawn <<-COMMAND + command = <<-COMMAND #{arguments[:command]} \ -D es.foreground=yes \ -D es.cluster.name=#{arguments[:cluster_name]} \ -D es.node.name=#{arguments[:node_name]}-#{n} \ -D es.http.port=#{arguments[:port].to_i + (n-1)} \ -D es.path.data=#{arguments[:path_data]} \ -D es.path.work=#{arguments[:path_work]} \ + -D es.path.logs=#{arguments[:path_logs]} \ -D es.cluster.routing.allocation.disk.threshold_enabled=false \ - -D es.network.host=0.0.0.0 \ - -D es.discovery.zen.ping.multicast.enabled=true \ - -D es.script.disable_dynamic=false \ + -D es.network.host=#{@@network_host} \ + -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \ + -D es.script.inline=on \ + -D es.script.indexed=on \ -D es.node.test=true \ + -D es.node.testattr=test \ -D es.node.bench=true \ + -D es.path.repo=/tmp \ + -D es.repositories.url.allowed_urls=http://snapshot.test* \ -D es.logger.level=DEBUG \ #{arguments[:es_params]} \ > /dev/null COMMAND + STDERR.puts command.gsub(/ {1,}/, ' ') if ENV['DEBUG'] + + pid = Process.spawn(command) Process.detach pid pids << pid end # Check for proceses running @@ -144,13 +166,14 @@ # @return Boolean # @see Cluster#start Cluster.start # def stop(arguments={}) arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i + arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', @@network_host) nodes = begin - JSON.parse(Net::HTTP.get(URI("http://localhost:#{arguments[:port]}/_nodes/?process"))) + JSON.parse(Net::HTTP.get(URI("http://#{arguments[:network_host]}:#{arguments[:port]}/_nodes/?process"))) rescue Exception => e STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red) nil end @@ -159,14 +182,29 @@ pids = nodes['nodes'].map { |id, info| info['process']['id'] } unless pids.empty? print "\nStopping Elasticsearch nodes... ".ansi(:faint) pids.each_with_index do |pid, i| - begin - print "stopped PID #{pid}. ".ansi(:green) if Process.kill 'INT', pid - rescue Exception => e - print "[#{e.class}] PID #{pid} not found. ".ansi(:red) + ['INT','KILL'].each do |signal| + begin + Process.kill signal, pid + rescue Exception => e + print "[#{e.class}] PID #{pid} not found. ".ansi(:red) + end + + # Give the system some breathing space to finish... + sleep 1 + + # Check that pid really is dead + begin + Process.getpgid( pid ) + # `getpgid` will raise error if pid is dead, so if we get here, try next signal. + next + rescue Errno::ESRCH + print "stopped PID #{pid} with #{signal} signal. ".ansi(:green) + break # pid is dead + end end end puts else false @@ -182,11 +220,11 @@ # # @return Boolean # def running?(arguments={}) port = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i - cluster_name = arguments[:as] || ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test' + cluster_name = arguments[:as] || (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp) if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil return cluster_health['cluster_name'] == cluster_name && \ cluster_health['number_of_nodes'] == @@number_of_nodes end @@ -217,22 +255,22 @@ # @api private # # @return Boolean # def __wait_for_status(status='green', port=9250, timeout=30) - uri = URI("http://localhost:#{port}/_cluster/health?wait_for_status=#{status}") + uri = URI("http://#{@@network_host}:#{port}/_cluster/health?wait_for_status=#{status}") Timeout::timeout(timeout) do loop do response = begin JSON.parse(Net::HTTP.get(uri)) rescue Exception => e - puts e.inspect if ENV['DEBUG'] + STDERR.puts e.inspect if ENV['DEBUG'] nil end - puts response.inspect if ENV['DEBUG'] + STDERR.puts response.inspect if response && ENV['DEBUG'] if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i ) __print_cluster_info(port) and break end @@ -247,13 +285,13 @@ # Print information about the cluster on STDOUT # # @api private # def __print_cluster_info(port) - health = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/health"))) - nodes = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_nodes/process,http"))) - master = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/state")))['master_node'] + health = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/health"))) + nodes = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_nodes/process,http"))) + master = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/state")))['master_node'] puts "\n", ('-'*80).ansi(:faint), 'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint), 'Status: '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint), @@ -273,10 +311,10 @@ # Tries to load cluster health information # # @api private # def __get_cluster_health(port=9250) - uri = URI("http://localhost:#{port}/_cluster/health") + uri = URI("http://#{@@network_host}:#{port}/_cluster/health") if response = Net::HTTP.get(uri) rescue nil return JSON.parse(response) end end