lib/etcd/client.rb in etcd-0.0.5 vs lib/etcd/client.rb in etcd-0.0.6

- old
+ new

@@ -15,33 +15,32 @@ class Client include Etcd::Helpers include Etcd::Lockable - attr_reader :host, :port, :http, :allow_redirect + attr_reader :host, :port, :http, :allow_redirect, :use_ssl, :verify_mode ## # Creates a new instance of Etcd::Client. It accepts a hash +opts+ as argument # # @param [Hash] opts The options for new Etcd::Client object # @opts [String] :host IP address of the etcd server (default is '127.0.0.1') # @opts [Fixnum] :port Port number of the etcd server (default is 4001) + # @opts [Fixnum] :read_timeout Set default HTTP read timeout for all api calls (default is 60) def initialize(opts={}) @host = opts[:host] || '127.0.0.1' @port = opts[:port] || 4001 @read_timeout = opts[:read_timeout] || 60 - if opts.has_key?(:allow_redirect) - @allow_redirect = opts[:allow_redirect] - else - @allow_redirect = true - end + @allow_redirect = opts.has_key?(:allow_redirect) ? opts[:allow_redirect] : true + @use_ssl = opts[:use_ssl] || false + @verify_mode = opts[:verify_mode] || OpenSSL::SSL::VERIFY_PEER end - # Currently use 'v1' as version for etcd store + # Currently use 'v2' as version for etcd store def version_prefix - '/v1' + '/v2' end # Lists all machines in the cluster def machines api_execute( version_prefix + '/machines', :get).split(",").map(&:strip) @@ -71,76 +70,102 @@ # * ttl - shelf life of a key (in secsonds) (optional) def test_and_set(key, value, prevValue, ttl = nil) path = key_endpoint + key payload = {'value' => value, 'prevValue' => prevValue } payload['ttl'] = ttl unless ttl.nil? - response = api_execute(path, :post, payload) + response = api_execute(path, :put, params: payload) json2obj(response) end + + def create(key, value, ttl = nil) + path = key_endpoint + key + payload = {value: value, prevExist: false } + payload['ttl'] = ttl unless ttl.nil? + response = api_execute(path, :put, params: payload) + json2obj(response) + end + + def update(key, value, ttl = nil) + path = key_endpoint + key + payload = {value: value, prevExist: true } + payload['ttl'] = ttl unless ttl.nil? + response = api_execute(path, :put, params: payload) + json2obj(response) + end + # Adds a new key with specified value and ttl, overwrites old values if exists # # This method has following parameters as argument # * key - whose value to be set # * value - value to be set for specified key # * ttl - shelf life of a key (in secsonds) (optional) def set(key, value, ttl=nil) path = key_endpoint + key payload = {'value' => value} payload['ttl'] = ttl unless ttl.nil? - response = api_execute(path, :post, payload) + response = api_execute(path, :put, params: payload) json2obj(response) end # Deletes a key along with all associated data # # This method has following parameters as argument # * key - key to be deleted - def delete(key) - response = api_execute(key_endpoint + key, :delete) + def delete(key,opts={}) + response = api_execute(key_endpoint + key, :delete, params:opts) json2obj(response) end # Retrives a key with its associated data, if key is not present it will return with message "Key Not Found" # # This method has following parameters as argument # * key - whose data to be retrive - def get(key) - response = api_execute(key_endpoint + key, :get) + def get(key, opts={}) + response = api_execute(key_endpoint + key, :get, params:opts) json2obj(response) end # Gives a notification when specified key changes # # This method has following parameters as argument - # * key - key to be watched - # * index - etcd server index of specified key (optional) - def watch(key, index=nil) + # @ key - key to be watched + # @options [Hash] additional options for watching a key + # @options [Fixnum] :index watch the specified key from given index + # @options [Fixnum] :timeout specify http timeout (defaults to read_timeout value) + def watch(key, options={}) + timeout = options[:timeout] || @read_timeout + index = options[:waitIndex] || options[:index] response = if index.nil? - api_execute(watch_endpoint + key, :get) + api_execute(key_endpoint + key, :get, timeout: timeout, params:{wait: true}) else - api_execute(watch_endpoint + key, :post, {'index' => index}) + api_execute(key_endpoint + key, :get, timeout: timeout, params: {wait: true, waitIndex: index}) end json2obj(response) end # This method sends api request to etcd server. # # This method has following parameters as argument # * path - etcd server path (etcd server end point) # * method - the request method used - # * params - any additional parameters used by request method (optional) - def api_execute(path, method, params=nil) + # * options - any additional parameters used by request method (optional) + def api_execute(path, method, options={}) + params = options[:params] + timeout = options[:timeout] || @read_timeout + http = if path=~/^http/ uri = URI.parse(path) path = uri.path Net::HTTP.new(uri.host, uri.port) else Net::HTTP.new(host, port) end - http.read_timeout = @read_timeout + http.read_timeout = timeout + http.use_ssl = use_ssl + http.verify_mode = verify_mode case method when :get unless params.nil? encoded_params = URI.encode_www_form(params) @@ -150,27 +175,34 @@ when :post encoded_params = URI.encode_www_form(params) req = Net::HTTP::Post.new(path) req.body= encoded_params Log.debug("Setting body for post '#{encoded_params}'") + when :put + encoded_params = URI.encode_www_form(params) + req = Net::HTTP::Put.new(path) + req.body= encoded_params + Log.debug("Setting body for put '#{encoded_params}'") when :delete unless params.nil? encoded_params = URI.encode_www_form(params) path+= "?" + encoded_params end req = Net::HTTP::Delete.new(path) + else + raise "Unknown http action: #{method}" end Log.debug("Invoking: '#{req.class}' against '#{path}") res = http.request(req) Log.debug("Response code: #{res.code}") if res.is_a?(Net::HTTPSuccess) Log.debug("Http success") res.body elsif redirect?(res.code.to_i) and allow_redirect Log.debug("Http redirect, following") - api_execute(res['location'], method, params) + api_execute(res['location'], method, params: params) else Log.debug("Http error") Log.debug(res.body) res.error! end @@ -181,13 +213,23 @@ (code >= 300) and (code < 400) end def json2obj(json) obj = JSON.parse(json) - if obj.is_a?(Array) - obj.map{|e| OpenStruct.new(e)} + if obj.has_key?('nodes') + obj.map do |e| + node2obj(e) + end else - OpenStruct.new(obj) + node2obj(obj) end + end + + def node2obj(hash) + h = hash.dup + h[:value] = h['node']['value'] if h.has_key?('node') and h['node'].has_key?('value') + o = OpenStruct.new(h) + o.node = OpenStruct.new(o.node) if h.has_key?('node') + o end end end