require 'uri' require 'cql' module MonkeyButler module Databases class CassandraDatabase < AbstractDatabase attr_reader :client, :keyspace class << self def migration_ext ".cql" end def exception_class Cql::CqlError end end def initialize(url) super(url) options = { host: url.host, port: (url.port || 9042) } @client = Cql::Client.connect(options) @keyspace = url.path[1..-1] # Drop leading slash end def migrations_table? client.use('system') rows = client.execute "SELECT columnfamily_name FROM schema_columnfamilies WHERE keyspace_name='#{keyspace}' AND columnfamily_name='schema_migrations'" !rows.empty? end def origin_version client.use(keyspace) rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version ASC LIMIT 1") rows.empty? ? nil : rows.each.first['version'] end def current_version client.use(keyspace) rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version DESC LIMIT 1") rows.empty? ? nil : rows.each.first['version'] end def all_versions client.use(keyspace) rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version ASC") rows.each.map { |row| row['version'] } end def insert_version(version) client.use(keyspace) client.execute "INSERT INTO schema_migrations (partition_key, version) VALUES (0, ?)", version end def execute_migration(cql) cql.split(';').each { |statement| client.execute(statement) unless statement.strip.empty? } end def drop(keyspaces = [keyspace]) keyspaces.each { |keyspace| client.execute "DROP KEYSPACE IF EXISTS #{keyspace}" } end def create_migrations_table client.execute "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 1};" client.execute "CREATE TABLE IF NOT EXISTS #{keyspace}.schema_migrations (partition_key INT, version VARINT, PRIMARY KEY (partition_key, version));" end end end end