lib/etcd/client.rb in etcd-0.0.5 vs lib/etcd/client.rb in etcd-0.0.6
- old
+ new
@@ -15,33 +15,32 @@
class Client
include Etcd::Helpers
include Etcd::Lockable
- attr_reader :host, :port, :http, :allow_redirect
+ attr_reader :host, :port, :http, :allow_redirect, :use_ssl, :verify_mode
##
# Creates a new instance of Etcd::Client. It accepts a hash +opts+ as argument
#
# @param [Hash] opts The options for new Etcd::Client object
# @opts [String] :host IP address of the etcd server (default is '127.0.0.1')
# @opts [Fixnum] :port Port number of the etcd server (default is 4001)
+ # @opts [Fixnum] :read_timeout Set default HTTP read timeout for all api calls (default is 60)
def initialize(opts={})
@host = opts[:host] || '127.0.0.1'
@port = opts[:port] || 4001
@read_timeout = opts[:read_timeout] || 60
- if opts.has_key?(:allow_redirect)
- @allow_redirect = opts[:allow_redirect]
- else
- @allow_redirect = true
- end
+ @allow_redirect = opts.has_key?(:allow_redirect) ? opts[:allow_redirect] : true
+ @use_ssl = opts[:use_ssl] || false
+ @verify_mode = opts[:verify_mode] || OpenSSL::SSL::VERIFY_PEER
end
- # Currently use 'v1' as version for etcd store
+ # Currently use 'v2' as version for etcd store
def version_prefix
- '/v1'
+ '/v2'
end
# Lists all machines in the cluster
def machines
api_execute( version_prefix + '/machines', :get).split(",").map(&:strip)
@@ -71,76 +70,102 @@
# * ttl - shelf life of a key (in secsonds) (optional)
def test_and_set(key, value, prevValue, ttl = nil)
path = key_endpoint + key
payload = {'value' => value, 'prevValue' => prevValue }
payload['ttl'] = ttl unless ttl.nil?
- response = api_execute(path, :post, payload)
+ response = api_execute(path, :put, params: payload)
json2obj(response)
end
+
+ def create(key, value, ttl = nil)
+ path = key_endpoint + key
+ payload = {value: value, prevExist: false }
+ payload['ttl'] = ttl unless ttl.nil?
+ response = api_execute(path, :put, params: payload)
+ json2obj(response)
+ end
+
+ def update(key, value, ttl = nil)
+ path = key_endpoint + key
+ payload = {value: value, prevExist: true }
+ payload['ttl'] = ttl unless ttl.nil?
+ response = api_execute(path, :put, params: payload)
+ json2obj(response)
+ end
+
# Adds a new key with specified value and ttl, overwrites old values if exists
#
# This method has following parameters as argument
# * key - whose value to be set
# * value - value to be set for specified key
# * ttl - shelf life of a key (in secsonds) (optional)
def set(key, value, ttl=nil)
path = key_endpoint + key
payload = {'value' => value}
payload['ttl'] = ttl unless ttl.nil?
- response = api_execute(path, :post, payload)
+ response = api_execute(path, :put, params: payload)
json2obj(response)
end
# Deletes a key along with all associated data
#
# This method has following parameters as argument
# * key - key to be deleted
- def delete(key)
- response = api_execute(key_endpoint + key, :delete)
+ def delete(key,opts={})
+ response = api_execute(key_endpoint + key, :delete, params:opts)
json2obj(response)
end
# Retrives a key with its associated data, if key is not present it will return with message "Key Not Found"
#
# This method has following parameters as argument
# * key - whose data to be retrive
- def get(key)
- response = api_execute(key_endpoint + key, :get)
+ def get(key, opts={})
+ response = api_execute(key_endpoint + key, :get, params:opts)
json2obj(response)
end
# Gives a notification when specified key changes
#
# This method has following parameters as argument
- # * key - key to be watched
- # * index - etcd server index of specified key (optional)
- def watch(key, index=nil)
+ # @ key - key to be watched
+ # @options [Hash] additional options for watching a key
+ # @options [Fixnum] :index watch the specified key from given index
+ # @options [Fixnum] :timeout specify http timeout (defaults to read_timeout value)
+ def watch(key, options={})
+ timeout = options[:timeout] || @read_timeout
+ index = options[:waitIndex] || options[:index]
response = if index.nil?
- api_execute(watch_endpoint + key, :get)
+ api_execute(key_endpoint + key, :get, timeout: timeout, params:{wait: true})
else
- api_execute(watch_endpoint + key, :post, {'index' => index})
+ api_execute(key_endpoint + key, :get, timeout: timeout, params: {wait: true, waitIndex: index})
end
json2obj(response)
end
# This method sends api request to etcd server.
#
# This method has following parameters as argument
# * path - etcd server path (etcd server end point)
# * method - the request method used
- # * params - any additional parameters used by request method (optional)
- def api_execute(path, method, params=nil)
+ # * options - any additional parameters used by request method (optional)
+ def api_execute(path, method, options={})
+ params = options[:params]
+ timeout = options[:timeout] || @read_timeout
+
http = if path=~/^http/
uri = URI.parse(path)
path = uri.path
Net::HTTP.new(uri.host, uri.port)
else
Net::HTTP.new(host, port)
end
- http.read_timeout = @read_timeout
+ http.read_timeout = timeout
+ http.use_ssl = use_ssl
+ http.verify_mode = verify_mode
case method
when :get
unless params.nil?
encoded_params = URI.encode_www_form(params)
@@ -150,27 +175,34 @@
when :post
encoded_params = URI.encode_www_form(params)
req = Net::HTTP::Post.new(path)
req.body= encoded_params
Log.debug("Setting body for post '#{encoded_params}'")
+ when :put
+ encoded_params = URI.encode_www_form(params)
+ req = Net::HTTP::Put.new(path)
+ req.body= encoded_params
+ Log.debug("Setting body for put '#{encoded_params}'")
when :delete
unless params.nil?
encoded_params = URI.encode_www_form(params)
path+= "?" + encoded_params
end
req = Net::HTTP::Delete.new(path)
+ else
+ raise "Unknown http action: #{method}"
end
Log.debug("Invoking: '#{req.class}' against '#{path}")
res = http.request(req)
Log.debug("Response code: #{res.code}")
if res.is_a?(Net::HTTPSuccess)
Log.debug("Http success")
res.body
elsif redirect?(res.code.to_i) and allow_redirect
Log.debug("Http redirect, following")
- api_execute(res['location'], method, params)
+ api_execute(res['location'], method, params: params)
else
Log.debug("Http error")
Log.debug(res.body)
res.error!
end
@@ -181,13 +213,23 @@
(code >= 300) and (code < 400)
end
def json2obj(json)
obj = JSON.parse(json)
- if obj.is_a?(Array)
- obj.map{|e| OpenStruct.new(e)}
+ if obj.has_key?('nodes')
+ obj.map do |e|
+ node2obj(e)
+ end
else
- OpenStruct.new(obj)
+ node2obj(obj)
end
+ end
+
+ def node2obj(hash)
+ h = hash.dup
+ h[:value] = h['node']['value'] if h.has_key?('node') and h['node'].has_key?('value')
+ o = OpenStruct.new(h)
+ o.node = OpenStruct.new(o.node) if h.has_key?('node')
+ o
end
end
end