lib/etcd/client.rb in etcd-0.0.6 vs lib/etcd/client.rb in etcd-0.2.0.alpha
- old
+ new
@@ -1,235 +1,129 @@
+# Encoding: utf-8
+
require 'net/http'
require 'json'
require 'etcd/log'
-require 'etcd/mixins/helpers'
-require 'etcd/mixins/lockable'
-require 'ostruct'
+require 'etcd/stats'
+require 'etcd/keys'
+require 'etcd/exceptions'
+require 'etcd/mod/lock'
+require 'etcd/mod/leader'
-
module Etcd
##
- # This is the central ruby class for Etcd. It provides methods for all Etcd api calls.
- # It also provides few additional methods beyond the core Etcd api, like Etcd::Client#lock
- # and Etcd::Client#eternal_watch, they are defined in separate modules and included in this
- # class
+ # This is the central ruby class for Etcd. It provides methods for all
+ # etcd api calls. It also provides few additional methods beyond the core
+ # etcd api, like Etcd::Client#lock and Etcd::Client#eternal_watch, they
+ # are defined in separate modules and included in this class
class Client
- include Etcd::Helpers
- include Etcd::Lockable
+ HTTP_REDIRECT = ->(r){ r.is_a? Net::HTTPRedirection }
+ HTTP_SUCCESS = ->(r){ r.is_a? Net::HTTPSuccess }
+ HTTP_CLIENT_ERROR = ->(r){ r.is_a? Net::HTTPClientError }
- attr_reader :host, :port, :http, :allow_redirect, :use_ssl, :verify_mode
+ include Stats
+ include Keys
+ include Mod::Lock
+ include Mod::Leader
+ attr_reader :host, :port, :http, :allow_redirect
+ attr_reader :use_ssl, :verify_mode, :read_timeout
+
##
- # Creates a new instance of Etcd::Client. It accepts a hash +opts+ as argument
- #
+ # Creates an Etcd::Client object. It accepts a hash +opts+ as argument
+ #
# @param [Hash] opts The options for new Etcd::Client object
- # @opts [String] :host IP address of the etcd server (default is '127.0.0.1')
- # @opts [Fixnum] :port Port number of the etcd server (default is 4001)
- # @opts [Fixnum] :read_timeout Set default HTTP read timeout for all api calls (default is 60)
-
- def initialize(opts={})
+ # @opts [String] :host IP address of the etcd server (default 127.0.0.1)
+ # @opts [Fixnum] :port Port number of the etcd server (default 4001)
+ # @opts [Fixnum] :read_timeout set HTTP read timeouts (default 60)
+ def initialize(opts = {})
@host = opts[:host] || '127.0.0.1'
@port = opts[:port] || 4001
@read_timeout = opts[:read_timeout] || 60
- @allow_redirect = opts.has_key?(:allow_redirect) ? opts[:allow_redirect] : true
+ @allow_redirect = opts.key?(:allow_redirect) ? opts[:allow_redirect] : true
@use_ssl = opts[:use_ssl] || false
- @verify_mode = opts[:verify_mode] || OpenSSL::SSL::VERIFY_PEER
+ @verify_mode = opts.key?(:verify_mode) ? opts[:verify_mode] : OpenSSL::SSL::VERIFY_PEER
end
- # Currently use 'v2' as version for etcd store
+ # Returns the etcd api version that will be used for across API methods
def version_prefix
'/v2'
end
- # Lists all machines in the cluster
+ # Returns the etcd daemon version
+ def version
+ api_execute('/version', :get).body
+ end
+
+ # Returns array of all machines in the cluster
def machines
- api_execute( version_prefix + '/machines', :get).split(",").map(&:strip)
+ api_execute(version_prefix + '/machines', :get).body.split(',').map(&:strip)
end
- # Get the current leader in a cluster
+ # Get the current leader
def leader
- api_execute( version_prefix + '/leader', :get)
+ api_execute(version_prefix + '/leader', :get).body.strip
end
- # Lists all the data (keys, dir etc) present in etcd store
- def key_endpoint
- version_prefix + '/keys'
- end
-
- # Watches all keys and notifies if anyone changes
- def watch_endpoint
- version_prefix + '/watch'
- end
-
- # Set a new value for key if previous value of key is matched
- #
- # This method takes following parameters as argument
- # * key - whose value is going to change if previous value is matched
- # * value - new value to be set for specified key
- # * prevValue - value of a key to compare with existing value of key
- # * ttl - shelf life of a key (in secsonds) (optional)
- def test_and_set(key, value, prevValue, ttl = nil)
- path = key_endpoint + key
- payload = {'value' => value, 'prevValue' => prevValue }
- payload['ttl'] = ttl unless ttl.nil?
- response = api_execute(path, :put, params: payload)
- json2obj(response)
- end
-
-
- def create(key, value, ttl = nil)
- path = key_endpoint + key
- payload = {value: value, prevExist: false }
- payload['ttl'] = ttl unless ttl.nil?
- response = api_execute(path, :put, params: payload)
- json2obj(response)
- end
-
- def update(key, value, ttl = nil)
- path = key_endpoint + key
- payload = {value: value, prevExist: true }
- payload['ttl'] = ttl unless ttl.nil?
- response = api_execute(path, :put, params: payload)
- json2obj(response)
- end
-
- # Adds a new key with specified value and ttl, overwrites old values if exists
- #
- # This method has following parameters as argument
- # * key - whose value to be set
- # * value - value to be set for specified key
- # * ttl - shelf life of a key (in secsonds) (optional)
- def set(key, value, ttl=nil)
- path = key_endpoint + key
- payload = {'value' => value}
- payload['ttl'] = ttl unless ttl.nil?
- response = api_execute(path, :put, params: payload)
- json2obj(response)
- end
-
- # Deletes a key along with all associated data
- #
- # This method has following parameters as argument
- # * key - key to be deleted
- def delete(key,opts={})
- response = api_execute(key_endpoint + key, :delete, params:opts)
- json2obj(response)
- end
-
- # Retrives a key with its associated data, if key is not present it will return with message "Key Not Found"
- #
- # This method has following parameters as argument
- # * key - whose data to be retrive
- def get(key, opts={})
- response = api_execute(key_endpoint + key, :get, params:opts)
- json2obj(response)
- end
-
- # Gives a notification when specified key changes
- #
- # This method has following parameters as argument
- # @ key - key to be watched
- # @options [Hash] additional options for watching a key
- # @options [Fixnum] :index watch the specified key from given index
- # @options [Fixnum] :timeout specify http timeout (defaults to read_timeout value)
- def watch(key, options={})
- timeout = options[:timeout] || @read_timeout
- index = options[:waitIndex] || options[:index]
- response = if index.nil?
- api_execute(key_endpoint + key, :get, timeout: timeout, params:{wait: true})
- else
- api_execute(key_endpoint + key, :get, timeout: timeout, params: {wait: true, waitIndex: index})
- end
- json2obj(response)
- end
-
# This method sends api request to etcd server.
#
# This method has following parameters as argument
# * path - etcd server path (etcd server end point)
# * method - the request method used
# * options - any additional parameters used by request method (optional)
- def api_execute(path, method, options={})
-
+ def api_execute(path, method, options = {})
params = options[:params]
- timeout = options[:timeout] || @read_timeout
-
- http = if path=~/^http/
- uri = URI.parse(path)
- path = uri.path
- Net::HTTP.new(uri.host, uri.port)
- else
- Net::HTTP.new(host, port)
- end
- http.read_timeout = timeout
- http.use_ssl = use_ssl
- http.verify_mode = verify_mode
-
case method
when :get
- unless params.nil?
- encoded_params = URI.encode_www_form(params)
- path+= "?" + encoded_params
- end
- req = Net::HTTP::Get.new(path)
+ req = build_http_request(Net::HTTP::Get, path, params)
when :post
- encoded_params = URI.encode_www_form(params)
- req = Net::HTTP::Post.new(path)
- req.body= encoded_params
- Log.debug("Setting body for post '#{encoded_params}'")
+ req = build_http_request(Net::HTTP::Post, path, nil, params)
when :put
- encoded_params = URI.encode_www_form(params)
- req = Net::HTTP::Put.new(path)
- req.body= encoded_params
- Log.debug("Setting body for put '#{encoded_params}'")
+ req = build_http_request(Net::HTTP::Put, path, nil, params)
when :delete
- unless params.nil?
- encoded_params = URI.encode_www_form(params)
- path+= "?" + encoded_params
- end
- req = Net::HTTP::Delete.new(path)
+ req = build_http_request(Net::HTTP::Delete, path, params)
else
- raise "Unknown http action: #{method}"
+ fail "Unknown http action: #{method}"
end
-
+ timeout = options[:timeout] || @read_timeout
+ http = Net::HTTP.new(host, port)
+ http.read_timeout = timeout
+ http.use_ssl = use_ssl
+ http.verify_mode = verify_mode
Log.debug("Invoking: '#{req.class}' against '#{path}")
res = http.request(req)
Log.debug("Response code: #{res.code}")
- if res.is_a?(Net::HTTPSuccess)
- Log.debug("Http success")
- res.body
- elsif redirect?(res.code.to_i) and allow_redirect
- Log.debug("Http redirect, following")
- api_execute(res['location'], method, params: params)
- else
- Log.debug("Http error")
- Log.debug(res.body)
- res.error!
- end
+ process_http_request(res)
end
- private
- def redirect?(code)
- (code >= 300) and (code < 400)
- end
-
- def json2obj(json)
- obj = JSON.parse(json)
- if obj.has_key?('nodes')
- obj.map do |e|
- node2obj(e)
+ def process_http_request(res)
+ case res
+ when HTTP_SUCCESS
+ Log.debug('Http success')
+ res
+ when HTTP_REDIRECT
+ if allow_redirect
+ Log.debug('Http redirect, following')
+ api_execute(res['location'], method, params: params)
+ else
+ Log.debug('Http redirect not allowed')
+ res.error!
end
+ when HTTP_CLIENT_ERROR
+ fail Error.from_http_response(res)
else
- node2obj(obj)
+ Log.debug('Http error')
+ Log.debug(res.body)
+ res.error!
end
end
- def node2obj(hash)
- h = hash.dup
- h[:value] = h['node']['value'] if h.has_key?('node') and h['node'].has_key?('value')
- o = OpenStruct.new(h)
- o.node = OpenStruct.new(o.node) if h.has_key?('node')
- o
+ def build_http_request(klass, path, params = nil, body = nil)
+ path += '?' + URI.encode_www_form(params) unless params.nil?
+ req = klass.new(path)
+ req.body = URI.encode_www_form(body) unless body.nil?
+ Etcd::Log.debug("Built #{klass} path:'#{path}' body:'#{req.body}'")
+ req
end
end
end