lib/crate_ruby/client.rb in crate_ruby-0.0.9 vs lib/crate_ruby/client.rb in crate_ruby-0.1.0
- old
+ new
@@ -1,6 +1,5 @@
-# -*- coding: utf-8; -*-
#
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
# license agreements. See the NOTICE file distributed with this work for
# additional information regarding copyright ownership. Crate licenses
# this file to you under the Apache License, Version 2.0 (the "License");
@@ -19,141 +18,155 @@
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.
require 'json'
require 'net/http'
+require 'base64'
+
module CrateRuby
+ # Client to interact with Crate.io DB
class Client
- DEFAULT_HOST = "127.0.0.1"
- DEFAULT_PORT = "4200"
+ DEFAULT_HOST = '127.0.0.1'.freeze
+ DEFAULT_PORT = '4200'.freeze
- attr_accessor :logger
+ attr_accessor :logger, :schema, :username, :password
# Currently only a single server is supported. Fail over will be implemented in upcoming versions
# @param [Array] servers An Array of servers including ports [127.0.0.1:4200, 10.0.0.1:4201]
- # @param [opts] Optional paramters
+ # @param [opts] opts Optional parameters
# * logger: Custom Logger
# * http_options [Hash]: Net::HTTP options (open_timeout, read_timeout)
+ # * schema [String]: Default schema to search in
# @return [CrateRuby::Client]
def initialize(servers = [], opts = {})
@servers = servers
@servers << "#{DEFAULT_HOST}:#{DEFAULT_PORT}" if servers.empty?
@logger = opts[:logger] || CrateRuby.logger
@http_options = opts[:http_options] || { read_timeout: 3600 }
+ @schema = opts[:schema] || 'doc'
+ @username = opts[:username]
+ @password = opts[:password]
end
def inspect
- %Q{#<CrateRuby::Client:#{object_id}>}
+ %(#<CrateRuby::Client:#{object_id}>)
end
# Creates a table
# client.create_table "posts", id: [:integer, "primary key"], my_column: :string, my_integer_col: :integer
# @param [String] table_name
# @param [Hash] column_definition
- # @option column_definition [String] key sets column name, value sets column type. an array passed as value can be used to set options like primary keys
+ # @option column_definition [String] key sets column name, value sets column type. an array passed as value can
+ # be used to set options like primary keys
# @return [ResultSet]
#
- def create_table(table_name, column_definition = {}, blob=false)
+ def create_table(table_name, column_definition = {})
cols = column_definition.to_a.map { |a| a.join(' ') }.join(', ')
- stmt = %Q{CREATE TABLE "#{table_name}" (#{cols})}
+ stmt = %{CREATE TABLE "#{table_name}" (#{cols})}
execute(stmt)
end
# Creates a table for storing blobs
# @param [String] name Table name
- # @param [Integer] shard Shard count, defaults to 5
- # @param [Integer] number Number of replicas, defaults to 0
+ # @param [Integer] shard_count Shard count, defaults to 5
+ # @param [Integer] replicas Number of replicas, defaults to 0
# @return [ResultSet]
#
# client.create_blob_table("blob_table")
- def create_blob_table(name, shard_count=5, replicas=0)
- stmt = "create blob table #{name} clustered into #{shard_count} shards with (number_of_replicas=#{replicas})"
- execute stmt
+ def create_blob_table(name, shard_count = 5, replicas = 0)
+ stmt = %{CREATE BLOB TABLE "#{name}" CLUSTERED INTO ? SHARDS WITH (number_of_replicas=?)}
+ execute stmt, [shard_count, replicas]
end
# Drop table
# @param [String] table_name, Name of table to drop
# @param [Boolean] blob Needs to be set to true if table is a blob table
# @return [ResultSet]
- def drop_table(table_name, blob=false)
- tbl = blob ? "BLOB TABLE" : "TABLE"
- stmt = %Q{DROP #{tbl} "#{table_name}"}
+ def drop_table(table_name, blob = false)
+ tbl = blob ? 'BLOB TABLE' : 'TABLE'
+ stmt = %(DROP #{tbl} "#{table_name}")
execute(stmt)
end
# List all user tables
# @return [ResultSet]
def show_tables
- execute("select table_name from information_schema.tables where table_schema = 'doc'")
+ execute('select table_name from information_schema.tables where table_schema = ?', [schema])
end
# Returns all tables in schema 'doc'
- # @return [Array] Array of table names
+ # @return [Array<String>] Array of table names
def tables
- execute("select table_name from information_schema.tables where table_schema = 'doc'").map(&:first)
+ execute('select table_name from information_schema.tables where table_schema = ?', [schema]).map(&:first)
end
+ # Returns all tables in schema 'blob'
+ # @return [Array<String>] Array of blob tables
+ def blob_tables
+ execute('select table_name from information_schema.tables where table_schema = ?', ['blob']).map(&:first)
+ end
+
# Executes a SQL statement against the Crate HTTP REST endpoint.
# @param [String] sql statement to execute
# @param [Array] args Array of values used for parameter substitution
- # @param [Hash] Net::HTTP options (open_timeout, read_timeout)
+ # @param [Array] bulk_args List of lists containing records to be processed
+ # @param [Hash] http_options Net::HTTP options (open_timeout, read_timeout)
# @return [ResultSet]
def execute(sql, args = nil, bulk_args = nil, http_options = {})
@logger.debug sql
- req = Net::HTTP::Post.new("/_sql", initheader = {'Content-Type' => 'application/json'})
- body = {"stmt" => sql}
- body.merge!({'args' => args}) if args
- body.merge!({'bulk_args' => bulk_args}) if bulk_args
+ req = Net::HTTP::Post.new('/_sql', headers)
+ body = { 'stmt' => sql }
+ body['args'] = args if args
+ body['bulk_args'] = bulk_args if bulk_args
req.body = body.to_json
response = request(req, http_options)
@logger.debug response.body
- success = case response.code
- when /^2\d{2}/
- ResultSet.new response.body
- else
- @logger.info(response.body)
- raise CrateRuby::CrateError.new(response.body)
- end
- success
+
+ case response.code
+ when /^2\d{2}/
+ ResultSet.new response.body
+ else
+ @logger.info(response.body)
+ raise CrateRuby::CrateError, response.body
+ end
end
# Upload a File to a blob table
# @param [String] table
# @param [String] digest SHA1 hexdigest
# @param [Boolean] data Can be any payload object that can be sent via HTTP, e.g. STRING, FILE
def blob_put(table, digest, data)
uri = blob_path(table, digest)
@logger.debug("BLOB PUT #{uri}")
- req = Net::HTTP::Put.new(blob_path(table, digest))
+ req = Net::HTTP::Put.new(blob_path(table, digest), headers)
req.body = data
response = request(req)
- success = case response.code
- when "201"
- true
- else
- @logger.info("Response #{response.code}: " + response.body)
- false
- end
- success
+ case response.code
+ when '201'
+ true
+ else
+ @logger.info("Response #{response.code}: " + response.body)
+ false
+ end
end
# Download blob
# @param [String] table
# @param [String] digest SHA1 hexdigest
#
# @return [Blob] File data to write to file or use directly
def blob_get(table, digest)
uri = blob_path(table, digest)
@logger.debug("BLOB GET #{uri}")
- req = Net::HTTP::Get.new(uri)
+ req = Net::HTTP::Get.new(uri, headers)
response = request(req)
case response.code
- when "200"
- response.body
- else
- @logger.info("Response #{response.code}: #{response.body}")
- false
+ when '200'
+ response.body
+ else
+ @logger.info("Response #{response.code}: #{response.body}")
+ false
end
end
# Delete blob
# @param [String] table
@@ -161,35 +174,34 @@
#
# @return [Boolean]
def blob_delete(table, digest)
uri = blob_path(table, digest)
@logger.debug("BLOB DELETE #{uri}")
- req = Net::HTTP::Delete.new(uri)
+ req = Net::HTTP::Delete.new(uri, headers)
response = request(req)
- success = case response.code
- when "200"
- true
- else
- @logger.info("Response #{response.code}: #{response.body}")
- false
- end
- success
+ case response.code
+ when '200'
+ true
+ else
+ @logger.info("Response #{response.code}: #{response.body}")
+ false
+ end
end
-
# Return the table structure
# @param [String] table_name Table name to get structure
# @param [ResultSet]
def table_structure(table_name)
- execute("select * from information_schema.columns where table_schema = 'doc' AND table_name = '#{table_name}'")
+ execute('select * from information_schema.columns where table_schema = ?' \
+ 'AND table_name = ?', [schema, table_name])
end
-
def insert(table_name, attributes)
vals = attributes.values
- binds = vals.count.times.map {|i| "$#{i+1}"}.join(',')
- stmt = %Q{INSERT INTO "#{table_name}" (#{attributes.keys.join(', ')}) VALUES(#{binds})}
+ identifiers = attributes.keys.map {|v| %{"#{v}"} }.join(', ')
+ binds = vals.count.times.map { |i| "$#{i + 1}" }.join(',')
+ stmt = %{INSERT INTO "#{table_name}" (#{identifiers}) VALUES(#{binds})}
execute(stmt, vals)
end
# Crate is eventually consistent, If you don't query by primary key,
# it is not guaranteed that an insert record is found on the next
@@ -205,11 +217,11 @@
def blob_path(table, digest)
"/_blobs/#{table}/#{digest}"
end
def connection
- host, port = @servers.first.split(':');
+ host, port = @servers.first.split(':')
Net::HTTP.new(host, port)
end
def request(req, http_options = {})
options = @http_options.merge(http_options)
@@ -217,7 +229,18 @@
options.each { |opt, value| http.send("#{opt}=", value) }
http.request(req)
end
end
+ def headers
+ header = { 'Content-Type' => 'application/json', 'Accept' => 'application/json' }
+ header['Default-Schema'] = schema if schema
+ header['Authorization'] = "Basic #{encrypted_credentials}" if username
+ header['X-User'] = username if username # for backwards compatibility with Crate 2.2
+ header
+ end
+
+ def encrypted_credentials
+ @encrypted_credentials ||= Base64.encode64 "#{username}:#{password}"
+ end
end
end