require File.join(File.dirname(__FILE__), '_lib.rb') require 'mosql/cli' class MoSQL::Test::Functional::StreamerTest < MoSQL::Test::Functional def build_streamer MoSQL::Streamer.new(:mongo => mongo, :tailer => nil, :options => {}, :sql => @adapter, :schema => @map) end describe 'with a basic schema' do TEST_MAP = < BSON::ObjectId.new, 'var' => 17 } @adapter.upsert_ns('mosql_test.collection', o) @streamer.handle_op({ 'ns' => 'mosql_test.collection', 'op' => 'u', 'o2' => { '_id' => o['_id'] }, 'o' => { 'var' => 27 } }) assert_equal(27, sequel[:sqltable].where(:_id => o['_id'].to_s).select.first[:var]) end it 'applies ops performed via applyOps' do o = { '_id' => BSON::ObjectId.new, 'var' => 17 } @adapter.upsert_ns('mosql_test.collection', o) op = { 'ns' => 'mosql_test.collection', 'op' => 'u', 'o2' => { '_id' => o['_id'] }, 'o' => { 'var' => 27 } } @streamer.handle_op({ 'op' => 'c', 'ns' => 'mosql_test.$cmd', 'o' => { 'applyOps' => [op] } }) assert_equal(27, sequel[:sqltable].where(:_id => o['_id'].to_s).select.first[:var]) end it 'handle "d" ops with BSON::ObjectIds' do o = { '_id' => BSON::ObjectId.new, 'var' => 17 } @adapter.upsert_ns('mosql_test.collection', o) @streamer.handle_op({ 'ns' => 'mosql_test.collection', 'op' => 'd', 'o' => { '_id' => o['_id'] }, }) assert_equal(0, sequel[:sqltable].where(:_id => o['_id'].to_s).count) end it 'handle "u" ops with $set and BSON::ObjectIDs' do o = { '_id' => BSON::ObjectId.new, 'var' => 17 } @adapter.upsert_ns('mosql_test.collection', o) # $set's are currently a bit of a hack where we read the object # from the db, so make sure the new object exists in mongo connect_mongo['mosql_test']['collection'].insert(o.merge('var' => 100), :w => 1) @streamer.handle_op({ 'ns' => 'mosql_test.collection', 'op' => 'u', 'o2' => { '_id' => o['_id'] }, 'o' => { '$set' => { 'var' => 100 } }, }) assert_equal(100, sequel[:sqltable].where(:_id => o['_id'].to_s).select.first[:var]) end it 'handle "u" ops with $set and a renamed _id' do o = { '_id' => BSON::ObjectId.new, 'goats' => 96 } @adapter.upsert_ns('mosql_test.renameid', o) # $set's are currently a bit of a hack where we read the object # from the db, so make sure the new object exists in mongo connect_mongo['mosql_test']['renameid'].insert(o.merge('goats' => 0), :w => 1) @streamer.handle_op({ 'ns' => 'mosql_test.renameid', 'op' => 'u', 'o2' => { '_id' => o['_id'] }, 'o' => { '$set' => { 'goats' => 0 } }, }) assert_equal(0, sequel[:sqltable2].where(:id => o['_id'].to_s).select.first[:goats]) end it 'handles "d" ops with a renamed id' do o = { '_id' => BSON::ObjectId.new, 'goats' => 1 } @adapter.upsert_ns('mosql_test.renameid', o) @streamer.handle_op({ 'ns' => 'mosql_test.renameid', 'op' => 'd', 'o' => { '_id' => o['_id'] }, }) assert_equal(0, sequel[:sqltable2].where(:id => o['_id'].to_s).count) end it 'filters unwanted records' do data = [{:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 1)), :var => 2}, {:_id => BSON::ObjectId.from_time(Time.utc(2014, 7, 2)), :var => 3}] collection = mongo["filter_test"]["collection"] collection.drop data.map { |rec| collection.insert(rec)} @streamer.options[:skip_tail] = true @streamer.initial_import inserted_records = @sequel[:filter_sqltable].select assert_equal(1, inserted_records.count) record = inserted_records.first data[1][:_id] = data[1][:_id].to_s assert_equal(data[1], record) end it 'handles "u" ops with a compsite key' do date = Time.utc(2014, 7, 1) o = {'_id' => {'s' => 'asdf', 't' => date}, 'var' => 'data'} collection = mongo["composite_key_test"]["collection"] collection.drop collection.insert(o) @streamer.options[:skip_tail] = true @streamer.initial_import collection.update({ '_id' => { 's' => 'asdf', 't' => date}}, { '$set' => { 'var' => 'new_data'}}) @streamer.handle_op({'ns' => 'composite_key_test.collection', 'op' => 'u', 'o2' => { '_id' => { 's' => 'asdf', 't' => date}}, 'o' => { '$set' => { 'var' => 'new_data'}} }) assert_equal(0, @sequel[:composite_table].where(:var => "data").count) assert_equal(1, @sequel[:composite_table].where(:var => "new_data").count) end it 'handles composite keys' do o = {'_id' => {'s' => 'asdf', 't' => Time.new}, 'var' => 'data'} collection = mongo["composite_key_test"]["collection"] collection.drop collection.insert(o) @streamer.options[:skip_tail] = true @streamer.initial_import assert_equal(1, @sequel[:composite_table].count) end describe '.bulk_upsert' do it 'inserts multiple rows' do objs = [ { '_id' => BSON::ObjectId.new, 'var' => 0 }, { '_id' => BSON::ObjectId.new, 'var' => 1, 'arry' => [1, 2] }, { '_id' => BSON::ObjectId.new, 'var' => 3 }, ].map { |o| @map.transform('mosql_test.collection', o) } @streamer.bulk_upsert(sequel[:sqltable], 'mosql_test.collection', objs) assert(sequel[:sqltable].where(:_id => objs[0].first, :var => 0).count) assert(sequel[:sqltable].where(:_id => objs[1].first, :var => 1).count) assert(sequel[:sqltable].where(:_id => objs[2].first, :var => 3).count) end it 'upserts' do _id = BSON::ObjectId.new objs = [ { '_id' => _id, 'var' => 0 }, { '_id' => BSON::ObjectId.new, 'var' => 1 }, { '_id' => BSON::ObjectId.new, 'var' => 3 }, ].map { |o| @map.transform('mosql_test.collection', o) } @streamer.bulk_upsert(sequel[:sqltable], 'mosql_test.collection', objs) newobjs = [ { '_id' => _id, 'var' => 117 }, { '_id' => BSON::ObjectId.new, 'var' => 32 }, ].map { |o| @map.transform('mosql_test.collection', o) } @streamer.bulk_upsert(sequel[:sqltable], 'mosql_test.collection', newobjs) assert(sequel[:sqltable].where(:_id => newobjs[0].first, :var => 117).count) assert(sequel[:sqltable].where(:_id => newobjs[1].first, :var => 32).count) end describe 'when working with --unsafe' do it 'raises on error by default' do assert_raises(Sequel::DatabaseError) do @streamer.handle_op({ 'ns' => 'mosql_test.collection', 'op' => 'u', 'o2' => { '_id' => 'a' }, 'o' => { 'var' => 1 << 62 }, }) end end it 'does not raises on error with :unsafe' do @streamer.options[:unsafe] = true @streamer.handle_op({ 'ns' => 'mosql_test.collection', 'op' => 'u', 'o2' => { '_id' => 'a' }, 'o' => { 'var' => 1 << 62 }, }) assert_equal(0, sequel[:sqltable].where(:_id => 'a').count) end end end end describe 'when dealing with aliased dbs' do ALIAS_MAP = < _id, :var => i}, :w => 1) end @streamer.options[:skip_tail] = true @streamer.initial_import sqlobjs = @sequel[:sqltable].select.to_a assert_equal(ids.map(&:to_s).sort, sqlobjs.map { |o| o[:_id] }.sort) end end describe 'timestamps' do TIMESTAMP_MAP = < {"t" => 1408647630, "i" => 4}, "h" => -965650193548512059, "v" => 2, "op" => "i", "ns" => "db.has_timestamp", "o" => mongo['db']['has_timestamp'].find_one({_id: id}) }) got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts] assert_equal(ts.to_i, got.to_i) assert_equal(ts.tv_usec, got.tv_usec) end end end