lib/elasticsearch/extensions/test/cluster.rb in elasticsearch-extensions-0.0.21 vs lib/elasticsearch/extensions/test/cluster.rb in elasticsearch-extensions-0.0.22

- old
+ new

@@ -19,311 +19,570 @@ module Elasticsearch module Extensions module Test - # A convenience Ruby class for starting and stopping a separate testing in-memory cluster, - # to not depend on -- and not mess up -- <localhost:9200>. + # A convenience Ruby class for starting and stopping an Elasticsearch cluster, + # eg. for integration tests # # @example Start a cluster with default configuration # require 'elasticsearch/extensions/test/cluster' - # Elasticsearch::Extensions::Test::Cluster.start + # Elasticsearch::Extensions::Test::Cluster::Cluster.new.start # - # @see Cluster#start Cluster.start - # @see Cluster#stop Cluster.stop + # @see Cluster#initialize # module Cluster - @@network_host = ENV.fetch('TEST_CLUSTER_NETWORK_HOST', 'localhost') - @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || 2).to_i - @@default_cluster_name = "elasticsearch-test-#{Socket.gethostname.downcase}" # Starts a cluster # - # Launches the specified number of nodes in test-suitable configuration by default - # and prints information about the cluster -- unless this specific cluster is running already. + # @see Cluster#start # - # Use the {Cluster#stop Cluster.stop} command with the same arguments to stop this cluster. + def start(arguments={}) + Cluster.new(arguments).start + end + + # Stops a cluster # - # @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`) - # @option arguments [Integer] :nodes Number of desired nodes (default: 2) - # @option arguments [String] :command Elasticsearch command (default: `elasticsearch`) - # @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250) - # @option arguments [String] :node_name The node name (will be appended with a number) - # @option arguments [String] :path_data Path to the directory to store data in - # @option arguments [String] :path_work Path to the directory with auxiliary files - # @option arguments [String] :path_logs Path to the directory with log files - # @option arguments [Boolean] :multicast_enabled Whether multicast is enabled (default: true) - # @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30) - # @option arguments [String] :network_host The host that nodes will bind on and publish to - # @option arguments [Boolean] :clear_cluster Wipe out cluster content on startup (default: true) + # @see Cluster#stop # - # You can also use environment variables to set these options. + def stop(arguments={}) + Cluster.new(arguments).stop + end + + # Returns true when a specific test node is running within the cluster # - # @example Start a cluster with default configuration (2 nodes, in-memory, etc) - # Elasticsearch::Extensions::Test::Cluster.start + # @see Cluster#running? # - # @example Start a cluster with a custom configuration - # Elasticsearch::Extensions::Test::Cluster.start \ - # cluster_name: 'my-cluster', - # nodes: 3, - # node_name: 'my-node', - # port: 9350 + def running?(arguments={}) + Cluster.new(arguments).running? + end + + # Waits until the cluster is green and prints information # - # @example Start a cluster with a different Elasticsearch version - # Elasticsearch::Extensions::Test::Cluster.start \ - # command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch" + # @see Cluster#wait_for_green # - # @return Boolean - # @see Cluster#stop Cluster.stop - # - def start(arguments={}) - @@number_of_nodes = ( ENV.fetch('TEST_CLUSTER_NODES', arguments[:nodes] || 2) ).to_i + def wait_for_green(arguments={}) + Cluster.new(arguments).wait_for_green + end - arguments[:command] ||= ENV.fetch('TEST_CLUSTER_COMMAND', 'elasticsearch') - arguments[:port] ||= (ENV.fetch('TEST_CLUSTER_PORT', 9250).to_i) - arguments[:cluster_name] ||= (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp) - arguments[:node_name] ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node') - arguments[:path_data] ||= ENV.fetch('TEST_CLUSTER_DATA', '/tmp/elasticsearch_test') - arguments[:path_work] ||= ENV.fetch('TEST_CLUSTER_TMP', '/tmp') - arguments[:path_logs] ||= ENV.fetch('TEST_CLUSTER_LOGS', '/tmp/log/elasticsearch') - arguments[:es_params] ||= ENV.fetch('TEST_CLUSTER_PARAMS', '') - arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true') - arguments[:timeout] ||= (ENV.fetch('TEST_CLUSTER_TIMEOUT', 30).to_i) - arguments[:network_host] ||= @@network_host + module_function :start, :stop, :running?, :wait_for_green - clear_cluster = !!arguments[:clear_cluster] || (ENV.fetch('TEST_CLUSTER_CLEAR', 'true') != 'false') + class Cluster + attr_reader :arguments - # Make sure `cluster_name` is not dangerous - if arguments[:cluster_name] =~ /^[\/\\]?$/ - raise ArgumentError, "The `cluster_name` parameter cannot be empty string or a slash" - end + COMMANDS = { + '0.90' => lambda { |arguments, node_number| + <<-COMMAND.gsub(/ /, '') + #{arguments[:command]} \ + -f \ + -D es.cluster.name=#{arguments[:cluster_name]} \ + -D es.node.name=#{arguments[:node_name]}-#{node_number} \ + -D es.http.port=#{arguments[:port].to_i + (node_number-1)} \ + -D es.path.data=#{arguments[:path_data]} \ + -D es.path.work=#{arguments[:path_work]} \ + -D es.path.logs=#{arguments[:path_logs]} \ + -D es.cluster.routing.allocation.disk.threshold_enabled=false \ + -D es.network.host=#{arguments[:network_host]} \ + -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \ + -D es.script.inline=true \ + -D es.script.indexed=true \ + -D es.node.test=true \ + -D es.node.testattr=test \ + -D es.node.bench=true \ + -D es.path.repo=/tmp \ + -D es.repositories.url.allowed_urls=http://snapshot.test* \ + -D es.logger.level=DEBUG \ + #{arguments[:es_params]} \ + > /dev/null + COMMAND + }, - if running? :on => arguments[:port], :as => arguments[:cluster_name] - print "[!] Elasticsearch cluster already running".ansi(:red) - wait_for_green(arguments[:port], arguments[:timeout]) - return false - end - - # Wipe out data on disk for this cluster name by default - FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" if clear_cluster - - print "Starting ".ansi(:faint) + - @@number_of_nodes.to_s.ansi(:bold, :faint) + - " Elasticsearch nodes..".ansi(:faint) - - pids = [] - - @@number_of_nodes.times do |n| - n += 1 - command = <<-COMMAND - #{arguments[:command]} \ + '1.0' => lambda { |arguments, node_number| + <<-COMMAND.gsub(/ /, '') + #{arguments[:command]} \ -D es.foreground=yes \ -D es.cluster.name=#{arguments[:cluster_name]} \ - -D es.node.name=#{arguments[:node_name]}-#{n} \ - -D es.http.port=#{arguments[:port].to_i + (n-1)} \ + -D es.node.name=#{arguments[:node_name]}-#{node_number} \ + -D es.http.port=#{arguments[:port].to_i + (node_number-1)} \ -D es.path.data=#{arguments[:path_data]} \ -D es.path.work=#{arguments[:path_work]} \ -D es.path.logs=#{arguments[:path_logs]} \ -D es.cluster.routing.allocation.disk.threshold_enabled=false \ - -D es.network.host=#{@@network_host} \ + -D es.network.host=#{arguments[:network_host]} \ -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \ -D es.script.inline=on \ -D es.script.indexed=on \ -D es.node.test=true \ -D es.node.testattr=test \ -D es.node.bench=true \ -D es.path.repo=/tmp \ -D es.repositories.url.allowed_urls=http://snapshot.test* \ + -D es.logger.level=#{ENV['DEBUG'] ? 'DEBUG' : 'INFO'} \ + #{arguments[:es_params]} \ + > /dev/null + COMMAND + }, + + '2.0' => lambda { |arguments, node_number| + <<-COMMAND.gsub(/ /, '') + #{arguments[:command]} \ + -D es.foreground=yes \ + -D es.cluster.name=#{arguments[:cluster_name]} \ + -D es.node.name=#{arguments[:node_name]}-#{node_number} \ + -D es.http.port=#{arguments[:port].to_i + (node_number-1)} \ + -D es.path.data=#{arguments[:path_data]} \ + -D es.path.work=#{arguments[:path_work]} \ + -D es.path.logs=#{arguments[:path_logs]} \ + -D es.cluster.routing.allocation.disk.threshold_enabled=false \ + -D es.network.host=#{arguments[:network_host]} \ + -D es.script.inline=true \ + -D es.script.stored=true \ + -D es.node.attr.testattr=test \ + -D es.path.repo=/tmp \ + -D es.repositories.url.allowed_urls=http://snapshot.test* \ -D es.logger.level=DEBUG \ #{arguments[:es_params]} \ > /dev/null - COMMAND - STDERR.puts command.gsub(/ {1,}/, ' ') if ENV['DEBUG'] + COMMAND + }, - pid = Process.spawn(command) - Process.detach pid - pids << pid - end + '5.0' => lambda { |arguments, node_number| + <<-COMMAND.gsub(/ /, '') + #{arguments[:command]} \ + -E cluster.name=#{arguments[:cluster_name]} \ + -E node.name=#{arguments[:node_name]}-#{node_number} \ + -E http.port=#{arguments[:port].to_i + (node_number-1)} \ + -E path.data=#{arguments[:path_data]} \ + -E path.logs=#{arguments[:path_logs]} \ + -E cluster.routing.allocation.disk.threshold_enabled=false \ + -E network.host=#{arguments[:network_host]} \ + -E script.inline=true \ + -E script.stored=true \ + -E node.attr.testattr=test \ + -E path.repo=/tmp \ + -E repositories.url.allowed_urls=http://snapshot.test* \ + -E discovery.zen.minimum_master_nodes=#{arguments[:number_of_nodes]-1} \ + -E logger.level=DEBUG \ + #{arguments[:es_params]} \ + > /dev/null + COMMAND + } + } - # Check for proceses running - if `ps -p #{pids.join(' ')}`.split("\n").size < @@number_of_nodes+1 - STDERR.puts "", "[!!!] Process failed to start (see output above)".ansi(:red) - exit(1) + # Create a new instance of the Cluster class + # + # @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`) + # @option arguments [Integer] :nodes Number of desired nodes (default: 2) + # @option arguments [String] :command Elasticsearch command (default: `elasticsearch`) + # @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250) + # @option arguments [String] :node_name The node name (will be appended with a number) + # @option arguments [String] :path_data Path to the directory to store data in + # @option arguments [String] :path_work Path to the directory with auxiliary files + # @option arguments [String] :path_logs Path to the directory with log files + # @option arguments [Boolean] :multicast_enabled Whether multicast is enabled (default: true) + # @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30) + # @option arguments [String] :network_host The host that nodes will bind on and publish to + # @option arguments [Boolean] :clear_cluster Wipe out cluster content on startup (default: true) + # + # You can also use environment variables to set the constructor options (see source). + # + # @see Cluster#start + # + def initialize(arguments={}) + @arguments = arguments + + @arguments[:command] ||= ENV.fetch('TEST_CLUSTER_COMMAND', 'elasticsearch') + @arguments[:port] ||= ENV.fetch('TEST_CLUSTER_PORT', 9250).to_i + @arguments[:cluster_name] ||= ENV.fetch('TEST_CLUSTER_NAME', __default_cluster_name).chomp + @arguments[:node_name] ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node') + @arguments[:path_data] ||= ENV.fetch('TEST_CLUSTER_DATA', '/tmp/elasticsearch_test') + @arguments[:path_work] ||= ENV.fetch('TEST_CLUSTER_TMP', '/tmp') + @arguments[:path_logs] ||= ENV.fetch('TEST_CLUSTER_LOGS', '/tmp/log/elasticsearch') + @arguments[:es_params] ||= ENV.fetch('TEST_CLUSTER_PARAMS', '') + @arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true') + @arguments[:timeout] ||= ENV.fetch('TEST_CLUSTER_TIMEOUT', 30).to_i + @arguments[:number_of_nodes] ||= ENV.fetch('TEST_CLUSTER_NODES', 2).to_i + @arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', __default_network_host) + + @clear_cluster = !!@arguments[:clear_cluster] || (ENV.fetch('TEST_CLUSTER_CLEAR', 'true') != 'false') + + # Make sure `cluster_name` is not dangerous + raise ArgumentError, "The `cluster_name` argument cannot be empty string or a slash" \ + if @arguments[:cluster_name] =~ /^[\/\\]?$/ end - wait_for_green(arguments[:port], arguments[:timeout]) - return true - end + # Starts a cluster + # + # Launches the specified number of nodes in a test-suitable configuration and prints + # information about the cluster -- unless this specific cluster is already running. + # + # @example Start a cluster with the default configuration (2 nodes, installed version, etc) + # Elasticsearch::Extensions::Test::Cluster::Cluster.new.start + # + # @example Start a cluster with a custom configuration + # Elasticsearch::Extensions::Test::Cluster::Cluster.new( + # cluster_name: 'my-cluster', + # nodes: 3, + # node_name: 'my-node', + # port: 9350 + # ).start + # + # @example Start a cluster with a different Elasticsearch version + # Elasticsearch::Extensions::Test::Cluster::Cluster.new( + # command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch" + # ).start + # + # @return Boolean,Array + # @see Cluster#stop + # + def start + if self.running? + STDOUT.print "[!] Elasticsearch cluster already running".ansi(:red) + return false + end - # Stop the cluster. - # - # Fetches the PID numbers from "Nodes Info" API and terminates matching nodes. - # - # @example Stop the default cluster - # Elasticsearch::Extensions::Test::Cluster.stop - # - # @example Stop the cluster reachable on specific port - # Elasticsearch::Extensions::Test::Cluster.stop port: 9350 - # - # @return Boolean - # @see Cluster#start Cluster.start - # - def stop(arguments={}) - arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i - arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', @@network_host) + __remove_cluster_data - nodes = begin - JSON.parse(Net::HTTP.get(URI("http://#{arguments[:network_host]}:#{arguments[:port]}/_nodes/?process"))) - rescue Exception => e - STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red) - nil + STDOUT.print "Starting ".ansi(:faint) + arguments[:number_of_nodes].to_s.ansi(:bold, :faint) + + " Elasticsearch nodes..".ansi(:faint) + pids = [] + + STDERR.puts "Using Elasticsearch version [#{version}]" if ENV['DEBUG'] + + arguments[:number_of_nodes].times do |n| + n += 1 + command = __command(version, arguments, n) + STDERR.puts command.gsub(/ {1,}/, ' ') if ENV['DEBUG'] + + pid = Process.spawn(command) + Process.detach pid + pids << pid + end + + __check_for_running_processes(pids) + wait_for_green + __print_cluster_info + + return true end - return false if nodes.nil? or nodes.empty? + # Stops the cluster + # + # Fetches the PID numbers from "Nodes Info" API and terminates matching nodes. + # + # @example Stop the default cluster + # Elasticsearch::Extensions::Test::Cluster::Cluster.new.stop + # + # @example Stop the cluster reachable on specific port + # Elasticsearch::Extensions::Test::Cluster::Cluster.new(port: 9350).stop + # + # @return Boolean,Array + # @see Cluster#start + # + def stop + begin + nodes = __get_nodes + rescue Exception => e + STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red) + nil + end - pids = nodes['nodes'].map { |id, info| info['process']['id'] } + return false if nodes.nil? or nodes.empty? - unless pids.empty? - print "\nStopping Elasticsearch nodes... ".ansi(:faint) - pids.each_with_index do |pid, i| - ['INT','KILL'].each do |signal| - begin - Process.kill signal, pid - rescue Exception => e - print "[#{e.class}] PID #{pid} not found. ".ansi(:red) - end + pids = nodes['nodes'].map { |id, info| info['process']['id'] } - # Give the system some breathing space to finish... - sleep 1 + unless pids.empty? + STDOUT.print "\nStopping Elasticsearch nodes... ".ansi(:faint) + pids.each_with_index do |pid, i| + ['INT','KILL'].each do |signal| + begin + Process.kill signal, pid + rescue Exception => e + STDOUT.print "[#{e.class}] PID #{pid} not found. ".ansi(:red) + end - # Check that pid really is dead - begin - Process.getpgid( pid ) - # `getpgid` will raise error if pid is dead, so if we get here, try next signal. - next - rescue Errno::ESRCH - print "stopped PID #{pid} with #{signal} signal. ".ansi(:green) - break # pid is dead + # Give the system some breathing space to finish... + Kernel.sleep 1 + + # Check that pid really is dead + begin + Process.getpgid pid + # `getpgid` will raise error if pid is dead, so if we get here, try next signal + next + rescue Errno::ESRCH + STDOUT.print "Stopped PID #{pid}".ansi(:green) + + (ENV['DEBUG'] ? " with #{signal} signal".ansi(:green) : '') + + ". ".ansi(:green) + break # pid is dead + end end end + STDOUT.puts + else + return false end - puts - else - false + + return pids end - return pids - end + # Returns true when a specific test node is running within the cluster + # + # @return Boolean + # + def running? + if cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil + return cluster_health['cluster_name'] == arguments[:cluster_name] && \ + cluster_health['number_of_nodes'] == arguments[:number_of_nodes] + end + return false + end - # Returns true when a specific test node is running within the cluster. - # - # @option arguments [Integer] :on The port on which the node is running. - # @option arguments [String] :as The cluster name. - # @option arguments [Integer] :num Number of nodes in the cluster. - # - # @return Boolean - # - def running?(arguments={}) - port = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i - cluster_name = arguments[:as] || (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp) - number_of_nodes = arguments[:num] || (ENV.fetch('TEST_CLUSTER_NODES', @@number_of_nodes)).to_i + # Waits until the cluster is green and prints information about it + # + # @return Boolean + # + def wait_for_green + __wait_for_status('green', 60) + end - if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil - return cluster_health['cluster_name'] == cluster_name && \ - cluster_health['number_of_nodes'] == number_of_nodes + # Returns the major version of Elasticsearch + # + # @return String + # @see __determine_version + # + def version + @version ||= __determine_version end - return false - end - # Waits until the cluster is green and prints information - # - # @example Print the information about the default cluster - # Elasticsearch::Extensions::Test::Cluster.wait_for_green - # - # @param (see #__wait_for_status) - # - # @return Boolean - # - def wait_for_green(port=9250, timeout=60) - __wait_for_status('green', port, timeout) - end - # Blocks the process and waits for the cluster to be in a "green" state. - # - # Prints information about the cluster on STDOUT if the cluster is available. - # - # @param status [String] The status to wait for (yellow, green) - # @param port [Integer] The port on which the cluster is reachable - # @param timeout [Integer] The explicit timeout for the operation - # - # @api private - # - # @return Boolean - # - def __wait_for_status(status='green', port=9250, timeout=30) - uri = URI("http://#{@@network_host}:#{port}/_cluster/health?wait_for_status=#{status}") + # Returns default `:network_host` setting based on the version + # + # @api private + # + # @return String + # + def __default_network_host + case version + when /^0|^1/ + '0.0.0.0' + when /^2/ + '0.0.0.0' + when /^5/ + '_local_' + else + raise RuntimeError, "Cannot determine default network host from version [#{version}]" + end + end - Timeout::timeout(timeout) do - loop do - response = begin - JSON.parse(Net::HTTP.get(uri)) - rescue Exception => e - STDERR.puts e.inspect if ENV['DEBUG'] - nil + # Returns a reasonably unique cluster name + # + # @api private + # + # @return String + # + def __default_cluster_name + "elasticsearch-test-#{Socket.gethostname.downcase}" + end + + # Returns the HTTP URL for the cluster based on `:network_host` setting + # + # @api private + # + # @return String + # + def __cluster_url + if '_local_' == arguments[:network_host] + "http://localhost:#{arguments[:port]}" + else + "http://#{arguments[:network_host]}:#{arguments[:port]}" + end + end + + # Determine Elasticsearch version to be launched + # + # Tries to parse the version number from the `lib/elasticsearch-X.Y.Z.jar` file, + # it not available, uses `elasticsearch --version` or `elasticsearch -v` + # + # @api private + # + # @return String + # + def __determine_version + path_to_lib = File.dirname(arguments[:command]) + '/../lib/' + + jar = Dir.entries(path_to_lib).select { |f| f.start_with? 'elasticsearch' }.first if File.exist? path_to_lib + + version = if jar + if m = jar.match(/elasticsearch\-(\d+\.\d+.\d+).*/) + m[1] + else + raise RuntimeError, "Cannot determine Elasticsearch version from jar [#{jar}]" end + else + STDERR.puts "[!] Cannot find Elasticsearch .jar from path to command [#{arguments[:command]}], using `elasticsearch --version`" if ENV['DEBUG'] - STDERR.puts response.inspect if response && ENV['DEBUG'] + output = '' - if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i ) - __print_cluster_info(port) and break + begin + # First, try the new `--version` syntax... + STDERR.puts "Running [#{arguments[:command]} --version] to determine version" if ENV['DEBUG'] + Timeout::timeout(10) { output = `#{arguments[:command]} --version` } + rescue Timeout::Error + # ...else, the new `-v` syntax + STDERR.puts "Running [#{arguments[:command]} -v] to determine version" if ENV['DEBUG'] + output = `#{arguments[:command]} -v` end - print '.'.ansi(:faint) - sleep 1 + STDERR.puts "> #{output}" if ENV['DEBUG'] + + if output.empty? + raise RuntimeError, "Cannot determine Elasticsearch version from [#{arguments[:command]} --version] or [#{arguments[:command]} -v]" + end + + if m = output.match(/Version: (\d\.\d.\d).*,/) + m[1] + else + raise RuntimeError, "Cannot determine Elasticsearch version from elasticsearch --version output [#{output}]" + end end + + case version + when /^0\.90.*/ + '0.90' + when /^1\..*/ + '1.0' + when /^2\..*/ + '2.0' + when /^5\..*/ + '5.0' + else + raise RuntimeError, "Cannot determine major version from [#{version}]" + end end - return true - end + # Returns the launch command for a specific version + # + # @api private + # + # @return String + # + def __command(version, arguments, node_number) + if command = COMMANDS[version] + command.call(arguments, node_number) + else + raise ArgumentError, "Cannot find command for version [#{version}]" + end + end - # Print information about the cluster on STDOUT - # - # @api private - # - def __print_cluster_info(port) - health = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/health"))) - nodes = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_nodes/process,http"))) - master = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/state")))['master_node'] + # Blocks the process and waits for the cluster to be in a "green" state + # + # Prints information about the cluster on STDOUT if the cluster is available. + # + # @param status [String] The status to wait for (yellow, green) + # @param timeout [Integer] The explicit timeout for the operation + # + # @api private + # + # @return Boolean + # + def __wait_for_status(status='green', timeout=30) + Timeout::timeout(timeout) do + loop do + response = __get_cluster_health(status) - puts "\n", - ('-'*80).ansi(:faint), - 'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint), - 'Status: '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint), - 'Nodes: '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint) + if response && response['status'] == status && ( arguments[:number_of_nodes].nil? || arguments[:number_of_nodes].to_i == response['number_of_nodes'].to_i ) + break + end - nodes['nodes'].each do |id, info| - m = id == master ? '*' : '+' - puts ''.ljust(20) + - "#{m} ".ansi(:faint) + - "#{info['name'].ansi(:bold)} ".ansi(:faint) + - "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) + - "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) + - "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint) + STDOUT.print '.'.ansi(:faint) + sleep 1 + end + end + + return true end - end - # Tries to load cluster health information - # - # @api private - # - def __get_cluster_health(port=9250) - uri = URI("http://#{@@network_host}:#{port}/_cluster/health") - if response = Net::HTTP.get(uri) rescue nil - return JSON.parse(response) + # Print information about the cluster on STDOUT + # + # @api private + # + # @return Nil + # + def __print_cluster_info + health = JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_cluster/health"))) + nodes = if version == '0.90' + JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/?process&http"))) + else + JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/process,http"))) + end + master = JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_cluster/state")))['master_node'] + + puts "\n", + ('-'*80).ansi(:faint), + 'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint), + 'Status: '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint), + 'Nodes: '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint) + + nodes['nodes'].each do |id, info| + m = id == master ? '*' : '+' + puts ''.ljust(20) + + "#{m} ".ansi(:faint) + + "#{info['name'].ansi(:bold)} ".ansi(:faint) + + "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) + + "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) + + "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint) + end end - end - extend self + # Tries to load cluster health information + # + # @api private + # + # @return Hash,Nil + # + def __get_cluster_health(status=nil) + uri = URI("#{__cluster_url}/_cluster/health") + uri.query = "wait_for_status=#{status}" if status + + begin + response = Net::HTTP.get(uri) + rescue Exception => e + STDERR.puts e.inspect if ENV['DEBUG'] + return nil + end + + JSON.parse(response) + end + + # Remove the data directory (unless it has been disabled by arguments) + # + # @api private + # + def __remove_cluster_data + # Wipe out data on disk for this cluster name by default + FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" if @clear_cluster + end + + + # Check whether process for PIDs are running + # + # @api private + # + def __check_for_running_processes(pids) + if `ps -p #{pids.join(' ')}`.split("\n").size < arguments[:number_of_nodes]+1 + STDERR.puts "", "[!!!] Process failed to start (see output above)".ansi(:red) + exit(1) + end + end + + # Get the information about nodes + # + # @api private + # + def __get_nodes + JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/process"))) + end + end end end end end