lib/monetdb/connection.rb in monetdb-0.1.3 vs lib/monetdb/connection.rb in monetdb-0.2.0
- old
+ new
@@ -1,443 +1,125 @@
require "socket"
-require "time"
-require "uri"
+require "monetdb/connection/messages"
+require "monetdb/connection/setup"
+require "monetdb/connection/query"
-# Implements the MAPI communication protocol
-class MonetDB
+module MonetDB
class Connection
- # enable debug output
- @@DEBUG = false
+ include Messages
+ include Setup
+ include Query
- # hour in seconds, used for timezone calculation
- @@HOUR = 3600
+ Q_TABLE = "1" # SELECT statement
+ Q_UPDATE = "2" # INSERT/UPDATE statement
+ Q_CREATE = "3" # CREATE/DROP TABLE statement
+ Q_PREPARE = "5" # QPREPARE message
+ Q_BLOCK = "6" # QBLOCK message
- # maximum size (in bytes) for a monetdb message to be sent
- @@MAX_MESSAGE_SIZE = 32766
+ MSG_ERROR = "!"
+ MSG_QUERY = "&"
+ MSG_SCHEME = "%"
+ MSG_TUPLE = "["
- # endianness of a message sent to the server
+ LANG = "sql"
+ REPLY_SIZE = "-1"
+ MAX_MSG_SIZE = 32766
- # MAPI protocols supported by the driver
+ MAPI_V8 = "8"
+ MAPI_V9 = "9"
- attr_reader :socket, :auto_commit, :transactions, :lang
+ AUTH_MD5 = "MD5"
+ AUTH_SHA512 = "SHA512"
+ AUTH_SHA384 = "SHA384"
+ AUTH_SHA256 = "SHA256"
+ AUTH_SHA1 = "SHA1"
- # A new instance of MonetDB::Connection.
- # * user : username (default is monetdb)
- # * passwd : password (default is monetdb)
- # * lang : language (default is sql)
- # * host : server hostanme or ip (default is localhost)
- # * port : server port (default is 50000)
- def initialize(user = "monetdb", passwd = "monetdb", lang = "sql", host = "", port = "50000")
- @user = user
- @passwd = passwd
- @lang = lang.downcase
- @host = host
- @port = port
- @client_endianness = @@CLIENT_ENDIANNESS
- @auth_iteration = 0
- @connection_established = false
- @transactions =
+ def initialize(config = {})
+ @config = {
+ :host => "localhost",
+ :port => 50000,
+ :username => "monetdb",
+ :password => "monetdb"
+ }.merge(
+ config.inject({}){|h, (k, v)| h[k.to_sym] = v; h}
+ )
- # Connect to the database, creates a new socket.
- def connect(db_name = "demo", auth_type = "SHA1")
- @database = db_name
- @auth_type = auth_type
- @socket =, @port.to_i)
- if real_connect
- if @lang == LANG_SQL
- set_timezone
- set_reply_size
- elsif (@lang == LANG_XQUERY) and XQUERY_OUTPUT_SEQ
- # require xquery output to be in seq format
- send(format_command("output seq"))
- end
- true
- else
- false
- end
+ def connect
+ disconnect if connected?
+ @socket = config[:host], config[:port].to_i
+ setup
+ true
- # Perform a real connection; retrieve challenge, proxy through merovinginan, build challenge and set the timezone.
- def real_connect
- server_challenge = retrieve_server_challenge()
- if server_challenge != nil
- salt = server_challenge.split(':')[0]
- @server_name = server_challenge.split(':')[1]
- @protocol = server_challenge.split(':')[2].to_i
- @supported_auth_types = server_challenge.split(':')[3].split(',')
- @server_endianness = server_challenge.split(':')[4]
- if @protocol == 9
- @pwhash = server_challenge.split(':')[5]
- end
- else
- raise MonetDB::ConnectionError, "Error: server returned an empty challenge string."
- end
- # The server supports only RIPMED168 or crypt as an authentication hash function, but the driver does not.
- if @supported_auth_types.length == 1
- auth = @supported_auth_types[0]
- if auth.upcase == "RIPEMD160" or auth.upcase == "CRYPT"
- raise MonetDB::ConnectionError, auth.upcase + " " + ": algorithm not supported by ruby-monetdb."
- end
- end
- # If the server protocol version is not 8: abort and notify the user.
- if @@SUPPORTED_PROTOCOLS.include?(@protocol) == false
- raise MonetDB::ProtocolError, "Protocol not supported. The current implementation of ruby-monetdb works with MAPI protocols #{@@SUPPORTED_PROTOCOLS} only."
- elsif mapi_proto_v8?
- reply = build_auth_string_v8(@auth_type, salt, @database)
- elsif mapi_proto_v9?
- reply = build_auth_string_v9(@auth_type, salt, @database)
- end
- if @socket != nil
- @connection_established = true
- send(reply)
- monetdb_auth = receive
- if monetdb_auth.length == 0
- # auth succedeed
- true
- else
- if monetdb_auth[0].chr == MSG_REDIRECT
- #redirection
- redirects = [] # store a list of possible redirects
- monetdb_auth.split('\n').each do |m|
- # strip the trailing ^mapi:
- # if the redirect string start with something != "^mapi:" or is empty, the redirect is invalid and shall not be included.
- if m[0..5] == "^mapi:"
- redir = m[6..m.length]
- # url parse redir
- redirects.push(redir)
- else
- $stderr.print "Warning: Invalid Redirect #{m}"
- end
- end
- if redirects.size == 0
- raise MonetDB::ConnectionError, "No valid redirect received"
- else
- begin
- uri = URI.split(redirects[0])
- # Splits the string on following parts and returns array with result:
- #
- # * Scheme
- # * Userinfo
- # * Host
- # * Port
- # * Registry
- # * Path
- # * Opaque
- # * Query
- # * Fragment
- server_name = uri[0]
- host = uri[2]
- port = uri[3]
- database = uri[5].gsub(/^\//, '') if uri[5] != nil
- rescue URI::InvalidURIError
- raise MonetDB::ConnectionError, "Invalid redirect: #{redirects[0]}"
- end
- end
- if server_name == "merovingian"
- if @auth_iteration <= 10
- @auth_iteration += 1
- real_connect
- else
- raise MonetDB::ConnectionError, "Merovingian: too many iterations while proxying."
- end
- elsif server_name == "monetdb"
- begin
- @socket.close
- rescue
- raise MonetDB::ConnectionError, "I/O error while closing connection to #{@socket}"
- end
- # reinitialize a connection
- @host = host
- @port = port
- connect(database, @auth_type)
- else
- @connection_established = false
- raise MonetDB::ConnectionError, monetdb_auth
- end
- elsif monetdb_auth[0].chr == MSG_INFO
- raise MonetDB::ConnectionError, monetdb_auth
- end
- end
- end
+ def connected?
+ !socket.nil?
- def savepoint
- @transactions.savepoint
+ def disconnect
+ socket.disconnect if connected?
+ @socket = nil
- # Formats a <i>command</i> string so that it can be parsed by the server.
- def format_command(x)
- "X" + x + "\n"
- end
+ private
- # Send an 'export' command to the server.
- def set_export(id, idx, offset)
- send(format_command("export " + id.to_s + " " + idx.to_s + " " + offset.to_s ))
+ def config
+ @config
- # Send a 'reply_size' command to the server.
- def set_reply_size
- send(format_command(("reply_size " + REPLY_SIZE)))
- response = receive
- if response == MSG_PROMPT
- true
- elsif response[0] == MSG_INFO
- raise MonetDB::CommandError, "Unable to set reply_size: #{response}"
- end
+ def socket
+ @socket
- def set_output_seq
- send(format_command("output seq"))
+ def log(type, msg)
+ MonetDB.logger.send(type, msg) if MonetDB.logger
- # Disconnect from server.
- def disconnect
- if @connection_established
- begin
- @socket.close
- rescue => e
- $stderr.print e
- end
- else
- raise MonetDB::ConnectionError, "No connection established."
- end
- end
+ def read
+ raise ConnectionError, "Not connected to server" unless connected?
- # Send data to a monetdb5 server instance and returns server response.
- def send(data)
- encode_message(data).each do |m|
- @socket.write(m)
- end
- end
+ length, last_chunk = read_length
+ data, iterations = "", 0
- # Receive data from a monetdb5 server instance.
- def receive
- is_final, chunk_size = recv_decode_hdr
- if chunk_size == 0
- return "" # needed on ruby-1.8.6 linux/64bit; recv(0) hangs on this configuration.
+ while (length > 0) && (iterations < 1000) do
+ received = socket.recv(length)
+ data << received
+ length -= received.bytesize
+ iterations += 1
+ data << read unless last_chunk
- data = @socket.recv(chunk_size)
- if is_final == false
- while is_final == false
- is_final, chunk_size = recv_decode_hdr
- data += @socket.recv(chunk_size)
- end
- end
- # Build and authentication string given the parameters submitted by the user (MAPI protocol v8).
- def build_auth_string_v8(auth_type, salt, db_name)
- # seed = password + salt
- if (auth_type.upcase == "MD5" or auth_type.upcase == "SHA1") and @supported_auth_types.include?(auth_type.upcase)
- auth_type = auth_type.upcase
- digest =, @passwd+salt)
- hashsum = digest.hashsum
- elsif auth_type.downcase == "plain" or not @supported_auth_types.include?(auth_type.upcase)
- auth_type = 'plain'
- hashsum = @passwd + salt
- elsif auth_type.downcase == "crypt"
- auth_type = @supported_auth_types[@supported_auth_types.index(auth_type)+1]
- $stderr.print "The selected hashing algorithm is not supported by the Ruby driver. #{auth_type} will be used instead."
- digest =, @passwd+salt)
- hashsum = digest.hashsum
- else
- # The user selected an auth type not supported by the server.
- raise MonetDB::ConnectionError, "#{auth_type} not supported by the server. Please choose one from #{@supported_auth_types}"
- end
- # Build the reply message with header
- reply = @client_endianness + ":" + @user + ":{" + auth_type + "}" + hashsum + ":" + @lang + ":" + db_name + ":"
+ def read_length
+ bytes = socket.recv(2).unpack("v")[0]
+ [(bytes >> 1), (bytes & 1) == 1]
- # Build and authentication string given the parameters submitted by the user (MAPI protocol v9).
- def build_auth_string_v9(auth_type, salt, db_name)
- if (auth_type.upcase == "MD5" or auth_type.upcase == "SHA1") and @supported_auth_types.include?(auth_type.upcase)
- auth_type = auth_type.upcase
- # Hash the password
- pwhash =, @passwd)
- digest =, pwhash.hashsum + salt)
- hashsum = digest.hashsum
- elsif auth_type.downcase == "plain" # or not @supported_auth_types.include?(auth_type.upcase)
- # Keep it for compatibility with merovingian
- auth_type = 'plain'
- hashsum = @passwd + salt
- elsif @supported_auth_types.include?(auth_type.upcase)
- if auth_type.upcase == "RIPEMD160"
- auth_type = @supported_auth_types[@supported_auth_types.index(auth_type)+1]
- $stderr.print "The selected hashing algorithm is not supported by the Ruby driver. #{auth_type} will be used instead."
- end
- # Hash the password
- pwhash =, @passwd)
- digest =, pwhash.hashsum + salt)
- hashsum = digest.hashsum
- else
- # The user selected an auth type not supported by the server.
- raise MonetDB::ConnectionError, "#{auth_type} not supported by the server. Please choose one from #{@supported_auth_types}"
+ def write(message)
+ raise ConnectionError, "Not connected to server" unless connected?
+ pack(message).each do |chunk|
+ socket.write(chunk)
- # Build the reply message with header
- reply = @client_endianness + ":" + @user + ":{" + auth_type + "}" + hashsum + ":" + @lang + ":" + db_name + ":"
+ true
- # Build a message to be sent to the server.
- def encode_message(msg = "")
- message =
- data = ""
- hdr = 0 # package header
- pos = 0
- is_final = false # last package in the stream
- while (! is_final)
- data = msg[pos..pos+[@@MAX_MESSAGE_SIZE.to_i, (msg.length - pos).to_i].min]
- pos += data.length
- if (msg.length - pos) == 0
- last_bit = 1
- is_final = true
- else
- last_bit = 0
- end
- hdr = [(data.length << 1) | last_bit].pack('v')
- message << hdr + data.to_s # Short Little Endian Encoding
- end
- message.freeze # freeze and return the encode message
- end
- # Used as the first step in the authentication phase; retrieves a challenge string from the server.
- def retrieve_server_challenge
- server_challenge = receive
- end
- # Reads and decodes the header of a server message.
- def recv_decode_hdr
- if @socket != nil
- fb = @socket.recv(1)
- sb = @socket.recv(1)
- # Use execeptions handling to keep compatibility between different ruby
- # versions.
- #
- # Chars are treated differently in ruby 1.8 and 1.9
- # try do to ascii to int conversion using ord (ruby 1.9)
- # and if it fail fallback to character.to_i (ruby 1.8)
- begin
- fb = fb[0].ord
- sb = sb[0].ord
- rescue NoMethodError => one_eight
- fb = fb[0].to_i
- sb = sb[0].to_i
- end
- chunk_size = (sb << 7) | (fb >> 1)
- is_final = false
- if ( (fb & 1) == 1 )
- is_final = true
- end
- # return the size of the chunk (in bytes)
- return is_final, chunk_size
- else
- raise MonetDB::SocketError, "Error while receiving data\n"
- end
- end
- # Sets the time zone according to the Operating System settings.
- def set_timezone
- tz =
- tz_offset = tz.gmt_offset / @@HOUR
- if tz_offset <= 9 # verify minute count!
- tz_offset = "'+0" + tz_offset.to_s + ":00'"
- else
- tz_offset = "'+" + tz_offset.to_s + ":00'"
- end
- query_tz = "sSET TIME ZONE INTERVAL " + tz_offset + " HOUR TO MINUTE;"
- # Perform the query directly within the method
- send(query_tz)
- response = receive
- if response == MSG_PROMPT
- true
- elsif response[0].chr == MSG_INFO
- raise MonetDB::QueryError, response
- end
- end
- # Turns auto commit on/off.
- def set_auto_commit(flag = true)
- if flag == false
- ac = " 0"
- else
- ac = " 1"
- end
- send(format_command("auto_commit " + ac))
- response = receive
- if response == MSG_PROMPT
- @auto_commit = flag
- elsif response[0].chr == MSG_INFO
- raise MonetDB::CommandError, response
- end
- end
- # Check the auto commit status (on/off).
- def auto_commit?
- !!@auto_commit
- end
- # Check if monetdb is running behind the merovingian proxy and forward the connection in case.
- def merovingian?
- @server_name.downcase == "merovingian"
- end
- def mserver?
- @server_name.downcase == "monetdb"
- end
- # Check which protocol is spoken by the server.
- def mapi_proto_v8?
- @protocol == 8
- end
- def mapi_proto_v9?
- @protocol == 9
+ def pack(message)
+ chunks = message.scan(/.{1,#{MAX_MSG_SIZE}}/m)
+ chunks.each_with_index.to_a.collect do |chunk, index|
+ last_bit = (index == chunks.size - 1) ? 1 : 0
+ length = [(chunk.size << 1) | last_bit].pack("v")
+ "#{length}#{chunk}"
+ end.freeze