// vim:ts=2:sts=2:sw=2:expandtab #include "adapter.h" #include "sys/select.h" // Extend the default dbi::FieldSet class with some ruby love. class Fields : public dbi::FieldSet { public: Fields() : dbi::FieldSet(0) {} void operator<<(VALUE v) { VALUE name = TO_S(v); fields.push_back(std::string(RSTRING_PTR(name), RSTRING_LEN(name))); } }; static VALUE cSwiftAdapter; void build_extra_options_string(VALUE key, VALUE value, VALUE ptr) { std::string *optstring = (std::string *)ptr; *optstring += CSTRING(key) + std::string("=") + CSTRING(value) + std::string(";"); } std::string parse_extra_options(VALUE options) { std::string optstring = ""; if (!NIL_P(options)) rb_hash_foreach(options, RUBY_STATIC_FUNC(build_extra_options_string), (VALUE)&optstring); return optstring; } static void adapter_free(dbi::Handle *handle) { if (handle) { handle->conn()->cleanup(); delete handle; } } VALUE adapter_alloc(VALUE klass) { dbi::Handle *handle = 0; return Data_Wrap_Struct(klass, 0, adapter_free, handle); } dbi::Handle* adapter_handle(VALUE self) { dbi::Handle *handle; Data_Get_Struct(self, dbi::Handle, handle); if (!handle) rb_raise(eSwiftRuntimeError, "Invalid object, did you forget to call #super?"); return handle; } /* Begin a transaction (unit of work). @overload commit(name = nil) @param [Symbol, String] name Optional transaction name. @see Swift::Adapter#transaction */ static VALUE adapter_begin(int argc, VALUE *argv, VALUE self) { VALUE save_point; rb_scan_args(argc, argv, "01", &save_point); dbi::Handle *handle = adapter_handle(self); try { NIL_P(save_point) ? handle->begin() : handle->begin(CSTRING(save_point)); } CATCH_DBI_EXCEPTIONS(); return Qtrue; } /* Close the connection. */ static VALUE adapter_close(VALUE self) { dbi::Handle *handle = adapter_handle(self); try { handle->close(); } CATCH_DBI_EXCEPTIONS(); rb_iv_set(self, "@closed", true); return Qtrue; } /* Check if connection is closed. */ static VALUE adapter_closed(VALUE self) { return rb_iv_get(self, "@closed"); } /* Shallow copy of adapter. @note Currently not allowed. @see Object.clone */ static VALUE adapter_clone(VALUE self) { rb_raise(eSwiftRuntimeError, "clone is not allowed."); } /* Commit a transaction (unit of work). @overload commit(name = nil) @param [Symbol, String] name Optional transaction name. */ static VALUE adapter_commit(int argc, VALUE *argv, VALUE self) { VALUE save_point; rb_scan_args(argc, argv, "01", &save_point); dbi::Handle *handle = adapter_handle(self); try { NIL_P(save_point) ? handle->commit() : handle->commit(CSTRING(save_point)); } CATCH_DBI_EXCEPTIONS(); return Qtrue; } /* Shallow copy of adapter. @note Currently not allowed. @see Object.dup */ static VALUE adapter_dup(VALUE self) { rb_raise(eSwiftRuntimeError, "dup is not allowed."); } /* Escape a string. @note Bind values do not need to be escaped. @overload escape(value) @param [String] value String to be escaped. @return [String] */ static VALUE adapter_escape(VALUE self, VALUE value) { if (TYPE(value) != T_STRING) value = TO_S(value); dbi::Handle *handle = adapter_handle(self); try { std::string safe = handle->escape(std::string(RSTRING_PTR(value), RSTRING_LEN(value))); return rb_str_new(safe.data(), safe.length()); } CATCH_DBI_EXCEPTIONS(); } /* Execute a single statement. @example result = User.execute("select * from #{User} where #{User.name} = ?", 'apple') result.first # User object. @overload execute(statement = '', *binds, &block) @param [String] statement Query statement. @param [*Object] binds Bind values. @yield [Swift::Result] @return [Swift::Result] */ static VALUE adapter_execute(int argc, VALUE *argv, VALUE self) { VALUE statement, bind_values, block, rows, scheme = Qnil; dbi::Handle *handle = adapter_handle(self); rb_scan_args(argc, argv, "1*&", &statement, &bind_values, &block); if (TYPE(statement) == T_CLASS) { scheme = statement; statement = rb_ary_shift(bind_values); } try { Query query; query.sql = CSTRING(statement); query.handle = handle; if (RARRAY_LEN(bind_values) > 0) query_bind_values(&query, bind_values); if (dbi::_trace) dbi::logMessage(dbi::_trace_fd, dbi::formatParams(query.sql, query.bind)); if ((rows = rb_thread_blocking_region(((VALUE (*)(void*))query_execute), &query, RUBY_UBF_IO, 0)) == Qfalse) rb_raise(query.error_klass, "%s", query.error_message); VALUE result = result_wrap_handle(cSwiftResult, self, handle->conn()->result(), true); if (!NIL_P(scheme)) rb_iv_set(result, "@scheme", scheme); return rb_block_given_p() ? result_each(result) : result; } CATCH_DBI_EXCEPTIONS(); } /* Reestablish a connection. */ static VALUE adapter_reconnect(VALUE self) { dbi::Handle *handle = adapter_handle(self); try { handle->reconnect(); rb_iv_set(self, "@closed", false); } CATCH_DBI_EXCEPTIONS(); return Qtrue; } /* Setup a new DB connection. You almost certainly want to setup a :default named adapter. The :default scope will be used for unscoped calls to Swift.db. @example Swift.setup :default, Swift::DB::Postgres, db: 'db1' Swift.setup :other, Swift::DB::Postgres, db: 'db2' @overload new(options = {}) @param [Hash] options Connection options @option options [String] :db Name. @option options [String] :user (*nix login user) @option options [String] :password ('') @option options [String] :host ('localhost') @option options [Integer] :port (DB default) @return [Swift::Adapter] @see Swift::DB @see Swift::Adapter */ static VALUE adapter_initialize(VALUE self, VALUE options) { VALUE db = rb_hash_aref(options, ID2SYM(rb_intern("db"))); VALUE driver = rb_hash_aref(options, ID2SYM(rb_intern("driver"))); VALUE user = rb_hash_aref(options, ID2SYM(rb_intern("user"))); if (NIL_P(db)) rb_raise(eSwiftArgumentError, "Adapter#new called without :db"); if (NIL_P(driver)) rb_raise(eSwiftArgumentError, "Adapter#new called without :driver"); user = NIL_P(user) ? current_user() : user; VALUE extra = rb_hash_dup(options); rb_hash_delete(extra, ID2SYM(rb_intern("db"))); rb_hash_delete(extra, ID2SYM(rb_intern("driver"))); rb_hash_delete(extra, ID2SYM(rb_intern("user"))); rb_hash_delete(extra, ID2SYM(rb_intern("password"))); rb_hash_delete(extra, ID2SYM(rb_intern("host"))); rb_hash_delete(extra, ID2SYM(rb_intern("port"))); std::string extra_options_string = parse_extra_options(extra); try { DATA_PTR(self) = new dbi::Handle( CSTRING(driver), CSTRING(user), CSTRING(rb_hash_aref(options, ID2SYM(rb_intern("password")))), CSTRING(db), CSTRING(rb_hash_aref(options, ID2SYM(rb_intern("host")))), CSTRING(rb_hash_aref(options, ID2SYM(rb_intern("port")))), extra_options_string.size() > 0 ? (char*)extra_options_string.c_str() : 0 ); } CATCH_DBI_EXCEPTIONS(); rb_iv_set(self, "@options", options); return Qnil; } /* Prepare a statement for on or more executions. @example sth = User.prepare("select * from #{User} where #{User.name} = ?") sth.execute('apple') #=> Result sth.execute('benny') #=> Result @overload prepare(statement, &block) @param [String] statement Query statement. @return [Swift::Statement] */ static VALUE adapter_prepare(int argc, VALUE *argv, VALUE self) { VALUE sql, scheme, prepared; dbi::AbstractStatement *statement; rb_scan_args(argc, argv, "11", &scheme, &sql); if (TYPE(scheme) != T_CLASS) { sql = scheme; scheme = Qnil; } dbi::Handle *handle = adapter_handle(self); try { // TODO: Move to statement_* constructor. statement = handle->conn()->prepare(CSTRING(sql)); prepared = statement_wrap_handle(cSwiftStatement, self, statement); rb_iv_set(prepared, "@scheme", scheme); rb_iv_set(prepared, "@sql", sql); return prepared; } CATCH_DBI_EXCEPTIONS(); } /* Rollback the current transaction. @overload rollback(name = nil) @param [Symbol, String] name Optional transaction name. */ static VALUE adapter_rollback(int argc, VALUE *argv, VALUE self) { VALUE save_point; dbi::Handle *handle = adapter_handle(self); rb_scan_args(argc, argv, "01", &save_point); try { NIL_P(save_point) ? handle->rollback() : handle->rollback(CSTRING(save_point)); } CATCH_DBI_EXCEPTIONS(); return Qtrue; } /* Block form transaction sugar. @overload transaction(name = nil, &block) @param [Symbol, String] name Optional transaction name. */ static VALUE adapter_transaction(int argc, VALUE *argv, VALUE self) { int status; VALUE sp, block, block_result = Qnil; dbi::Handle *handle = adapter_handle(self); rb_scan_args(argc, argv, "01&", &sp, &block); if (NIL_P(block)) rb_raise(eSwiftArgumentError, "Transaction called without a block."); std::string save_point = NIL_P(sp) ? "SP" + dbi::generateCompactUUID() : CSTRING(sp); try { handle->begin(save_point); block_result = rb_protect(rb_yield, self, &status); if (!status && handle->transactions().size() > 0) { handle->commit(save_point); } else if (status && handle->transactions().size() > 0) { handle->rollback(save_point); rb_jump_tag(status); } } CATCH_DBI_EXCEPTIONS(); return block_result; } /* Bulk insert resources. @overload write(store, fields, stream) @param [Swift::Scheme, String] store Write to store. @param [Array] fields Write to fields in store. @param [IO] stream IO to read from. @note The format of the stream and bulk write performance are entirely down to each adapter. */ static VALUE adapter_write(int argc, VALUE *argv, VALUE self) { uint64_t rows = 0; VALUE stream, table, fields; dbi::Handle *handle = adapter_handle(self); rb_scan_args(argc, argv, "30", &table, &fields, &stream); if (TYPE(stream) != T_STRING && !rb_respond_to(stream, rb_intern("read"))) rb_raise(eSwiftArgumentError, "Stream must be a String or IO object."); if (TYPE(fields) != T_ARRAY) rb_raise(eSwiftArgumentError, "Fields must be an Array."); try { Fields write_fields; for (int i = 0; i < RARRAY_LEN(fields); i++) write_fields << rb_ary_entry(fields, i); /* TODO: Adapter specific code is balls. This is just for the friggin mysql support - mysql does not like a statement close command being send on a handle when the writing has started. */ rb_gc(); if (TYPE(stream) == T_STRING) { dbi::StringIO io(RSTRING_PTR(stream), RSTRING_LEN(stream)); rows = handle->write(RSTRING_PTR(TO_S(table)), write_fields, &io); } else { AdapterIO io(stream); rows = handle->write(RSTRING_PTR(TO_S(table)), write_fields, &io); } return SIZET2NUM(rows); } CATCH_DBI_EXCEPTIONS(); } /* Returns the socket fileno for the connection. @overload fileno() @return [Fixnum] */ VALUE adapter_fileno(VALUE self) { dbi::Handle *handle = adapter_handle(self); return INT2NUM(handle->conn()->socket()); } /* Executes a query asynchronously and returns the result instance. @example @overload async_execute(statement = '', *binds, &block) @param [String] statement Query statement. @param [*Object] binds Bind values. @return [Swift::Result] */ VALUE adapter_async_execute(int argc, VALUE *argv, VALUE self) { VALUE statement, bind_values, block, scheme = Qnil, result; dbi::Handle *handle = adapter_handle(self); rb_scan_args(argc, argv, "1*&", &statement, &bind_values, &block); if (TYPE(statement) == T_CLASS) { scheme = statement; statement = rb_ary_shift(bind_values); } try { dbi::AbstractResult *dbi_result; if (RARRAY_LEN(bind_values) > 0) { Query query; query_bind_values(&query, bind_values); dbi_result = handle->conn()->aexecute(CSTRING(statement), query.bind); } else dbi_result = handle->conn()->aexecute(CSTRING(statement)); result = result_wrap_handle(cSwiftResult, self, dbi_result, true); if (!NIL_P(scheme)) rb_iv_set(result, "@scheme", scheme); // if block given, just use rb_thread_select if (rb_block_given_p()) { rb_thread_wait_fd(handle->socket()); while (dbi_result->consumeResult()); dbi_result->prepareResult(); } } CATCH_DBI_EXCEPTIONS(); return rb_block_given_p() ? result_each(result) : result; } void init_swift_adapter() { VALUE mSwift = rb_define_module("Swift"); cSwiftAdapter = rb_define_class_under(mSwift, "Adapter", rb_cObject); rb_define_method(cSwiftAdapter, "begin", RUBY_METHOD_FUNC(adapter_begin), -1); rb_define_method(cSwiftAdapter, "clone", RUBY_METHOD_FUNC(adapter_clone), 0); rb_define_method(cSwiftAdapter, "close", RUBY_METHOD_FUNC(adapter_close), 0); rb_define_method(cSwiftAdapter, "closed?", RUBY_METHOD_FUNC(adapter_closed), 0); rb_define_method(cSwiftAdapter, "commit", RUBY_METHOD_FUNC(adapter_commit), -1); rb_define_method(cSwiftAdapter, "dup", RUBY_METHOD_FUNC(adapter_dup), 0); rb_define_method(cSwiftAdapter, "escape", RUBY_METHOD_FUNC(adapter_escape), 1); rb_define_method(cSwiftAdapter, "execute", RUBY_METHOD_FUNC(adapter_execute), -1); rb_define_method(cSwiftAdapter, "initialize", RUBY_METHOD_FUNC(adapter_initialize), 1); rb_define_method(cSwiftAdapter, "prepare", RUBY_METHOD_FUNC(adapter_prepare), -1); rb_define_method(cSwiftAdapter, "rollback", RUBY_METHOD_FUNC(adapter_rollback), -1); rb_define_method(cSwiftAdapter, "transaction", RUBY_METHOD_FUNC(adapter_transaction), -1); rb_define_method(cSwiftAdapter, "write", RUBY_METHOD_FUNC(adapter_write), -1); rb_define_method(cSwiftAdapter, "reconnect", RUBY_METHOD_FUNC(adapter_reconnect), 0); // stuff you need for async rb_define_method(cSwiftAdapter, "fileno", RUBY_METHOD_FUNC(adapter_fileno), 0); rb_define_method(cSwiftAdapter, "async_execute", RUBY_METHOD_FUNC(adapter_async_execute), -1); rb_define_alloc_func(cSwiftAdapter, adapter_alloc); }