#include "SyncThread.h" #include "common/RhoTime.h" #include "common/RhoConf.h" #include "common/RhoFilePath.h" #ifndef RHO_NO_RUBY #include "ruby/ext/rho/rhoruby.h" #endif //RHO_NO_RUBY #include "sync/ClientRegister.h" #include "common/RhoAppAdapter.h" namespace rho { namespace sync { using namespace rho::common; using namespace rho::db; IMPLEMENT_LOGCLASS(CSyncThread,"Sync"); CSyncThread* CSyncThread::m_pInstance = 0; /*static*/ CSyncThread* CSyncThread::Create(common::IRhoClassFactory* factory) { if ( m_pInstance ) return m_pInstance; m_pInstance = new CSyncThread(factory); return m_pInstance; } /*static*/void CSyncThread::Destroy() { if ( m_pInstance ) delete m_pInstance; m_pInstance = 0; } CSyncThread::CSyncThread(common::IRhoClassFactory* factory) : CThreadQueue(factory) { CThreadQueue::setLogCategory(getLogCategory()); if( RHOCONF().isExist("sync_poll_interval") ) setPollInterval(RHOCONF().getInt("sync_poll_interval")); m_oSyncEngine.setFactory(factory); LOG(INFO) + "sync_poll_interval: " + RHOCONF().getInt("sync_poll_interval"); LOG(INFO) + "syncserver: " + RHOCONF().getString("syncserver"); LOG(INFO) + "bulksync_state: " + RHOCONF().getInt("bulksync_state"); if ( RHOCONF().getString("syncserver").length() > 0 ) start(epLow); } CSyncThread::~CSyncThread(void) { m_oSyncEngine.exitSync(); stop(SYNC_WAIT_BEFOREKILL_SECONDS); db::CDBAdapter::closeAll(); } #ifndef RHO_NO_RUBY unsigned long CSyncThread::getRetValue() { unsigned long ret = rho_ruby_get_NIL(); if ( isNoThreadedMode() ) { ret = rho_ruby_create_string( getSyncEngine().getNotify().getNotifyBody().c_str() ); getSyncEngine().getNotify().cleanNotifyBody(); } return ret; } #else unsigned long CSyncThread::getRetValue() { unsigned long ret = 0; if ( isNoThreadedMode() ) { ret = (unsigned long)rho_sync_create_string( getSyncEngine().getNotify().getNotifyBody().c_str() ); getSyncEngine().getNotify().cleanNotifyBody(); } return ret; } #endif int CSyncThread::getLastPollInterval() { uint64 nowTime = CLocalTime().toULong(); uint64 latestTimeUpdated = 0; Vector arPartNames = db::CDBAdapter::getDBAllPartitionNames(); for( int i = 0; i < (int)arPartNames.size(); i++ ) { db::CDBAdapter& dbPart = db::CDBAdapter::getDB(arPartNames.elementAt(i).c_str()); DBResult( res, dbPart.executeSQL("SELECT last_updated from sources") ); for ( ; !res.isEnd(); res.next() ) { uint64 timeUpdated = res.getUInt64ByIdx(0); if ( latestTimeUpdated < timeUpdated ) latestTimeUpdated = timeUpdated; } } return latestTimeUpdated > 0 ? (int)(nowTime-latestTimeUpdated) : 0; } void CSyncThread::onTimeout()//throws Exception { if ( isNoCommands() && getPollInterval()>0 ) addQueueCommandInt(new CSyncCommand(scSyncAll,false)); } void CSyncThread::checkShowStatus(CSyncCommand& oSyncCmd) { boolean bShowStatus = oSyncCmd.m_bShowStatus; m_oSyncEngine.getNotify().enableReporting(bShowStatus); if (m_oSyncEngine.getNotify().isReportingEnabled()) m_oSyncEngine.getNotify().showStatusPopup(RhoAppAdapter.getMessageText("syncronizing_data")); //m_statusListener.createStatusPopup(RhoRuby.getMessageText("syncronizing_data")); } void CSyncThread::processCommand(IQueueCommand* pCmd) { CSyncCommand& oSyncCmd = *((CSyncCommand*)pCmd); switch(oSyncCmd.m_nCmdCode) { case scSyncAll: checkShowStatus(oSyncCmd); m_oSyncEngine.doSyncAllSources(); break; case scSyncOne: { checkShowStatus(oSyncCmd); m_oSyncEngine.doSyncSource(CSyncEngine::CSourceID(oSyncCmd.m_nCmdParam,oSyncCmd.m_strCmdParam)); } break; case scSearchOne: { checkShowStatus(oSyncCmd); m_oSyncEngine.doSearch( ((CSyncSearchCommand&)oSyncCmd).m_arSources, oSyncCmd.m_strCmdParam, ((CSyncSearchCommand&)oSyncCmd).m_strFrom, ((CSyncSearchCommand&)oSyncCmd).m_bSyncChanges, oSyncCmd.m_nCmdParam); } break; case scLogin: { CSyncLoginCommand& oLoginCmd = (CSyncLoginCommand&)oSyncCmd; checkShowStatus(oSyncCmd); m_oSyncEngine.login(oLoginCmd.m_strName, oLoginCmd.m_strPassword, *oLoginCmd.m_pNotify ); } break; } } void CSyncThread::setPollInterval(int nInterval) { // if ( nInterval == 0 ) // m_oSyncEngine.stopSync(); CThreadQueue::setPollInterval(nInterval); } String CSyncThread::CSyncCommand::toString() { switch(m_nCmdCode) { case scNone: return "CheckPollInterval"; case scSyncAll: return "SyncAll"; case scSyncOne: return "SyncOne"; case scLogin: return "Login"; case scSearchOne: return "Search"; } return "Unknown; Code : " + convertToStringA(m_nCmdCode); } }; }; extern "C" { using namespace rho::sync; using namespace rho::db; unsigned long rho_sync_doSyncAllSources(int show_status_popup) { CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncCommand(CSyncThread::scSyncAll,show_status_popup!=0)); return CSyncThread::getInstance()->getRetValue(); } unsigned long rho_sync_doSyncSourceByID(int nSrcID) { CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncCommand(CSyncThread::scSyncOne, "", nSrcID, false ) ); return CSyncThread::getInstance()->getRetValue(); } unsigned long rho_sync_doSyncSourceByName(const char* szSrcName) { CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncCommand(CSyncThread::scSyncOne, szSrcName, 0, false ) ); return CSyncThread::getInstance()->getRetValue(); } #ifndef RHO_NO_RUBY unsigned long rho_sync_doSyncSource(unsigned long nSrcID,int show_status_popup) { CRhoRubyStringOrInt oSrcID = rho_ruby_getstringorint(nSrcID); CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncCommand(CSyncThread::scSyncOne, oSrcID.m_szStr, (int)oSrcID.m_nInt, show_status_popup!=0 ) ); return CSyncThread::getInstance()->getRetValue(); } #endif //RHO_NO_RUBY void rho_sync_stop() { LOG(INFO)+"STOP sync"; if (CSyncThread::getSyncEngine().isSyncing() ) { LOG(INFO)+"STOP sync in progress."; CSyncThread::getSyncEngine().stopSyncByUser(); CSyncThread::getInstance()->stopWait(); while( CDBAdapter::isAnyInsideTransaction() ) CSyncThread::getInstance()->sleep(100); } } extern "C" void source_iter(const char* szName, void* parSources) { rho::Vector& arSources = *((rho::Vector*)(parSources)); arSources.addElement(szName); } #ifndef RHO_NO_RUBY unsigned long rho_sync_doSearch(unsigned long ar_sources, const char *from, const char *params, bool sync_changes, int nProgressStep, const char* callback, const char* callback_params) { rho_sync_stop(); if ( callback && *callback ) CSyncThread::getSyncEngine().getNotify().setSearchNotification( new CSyncNotification( callback, callback_params ? callback_params : "", true) ); rho::Vector arSources; rho_ruby_enum_strary(ar_sources, source_iter, &arSources); CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncSearchCommand(from,params,arSources,sync_changes,nProgressStep) ); return CSyncThread::getInstance()->getRetValue(); } #endif //RHO_NO_RUBY unsigned long rho_sync_doSearchByNames(unsigned long ar_sources, const char *from, const char *params, bool sync_changes, int nProgressStep, /*RHOC_CALLBACK*/void* callback, void* callback_data) { rho_sync_stop(); if ( callback ) CSyncThread::getSyncEngine().getNotify().setSearchNotification( new CSyncNotification( (RHOC_CALLBACK)callback, callback_data, true ) ); rho::Vector& arSources = *((rho::Vector*)ar_sources); CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncSearchCommand(from,params,arSources,sync_changes,nProgressStep) ); return CSyncThread::getInstance()->getRetValue(); } int rho_sync_set_pollinterval(int nInterval) { int nOldInterval = CSyncThread::getInstance()->getPollInterval(); CSyncThread::getInstance()->setPollInterval(nInterval); return nOldInterval; } int rho_sync_get_pollinterval() { return CSyncThread::getInstance()->getPollInterval(); } void rho_sync_set_syncserver(const char* syncserver) { rho_sync_stop(); CSyncThread::getSyncEngine().setSyncServer(syncserver); if ( syncserver && *syncserver ) { CSyncThread::getInstance()->start(CSyncThread::epLow); if ( CClientRegister::getInstance() != null ) CClientRegister::getInstance()->startUp(); } else { CSyncThread::getInstance()->stop(CSyncThread::SYNC_WAIT_BEFOREKILL_SECONDS); if ( CClientRegister::getInstance() != null ) CClientRegister::getInstance()->stop(CSyncThread::SYNC_WAIT_BEFOREKILL_SECONDS); } } unsigned long rho_sync_login(const char *name, const char *password, const char* callback) { rho_sync_stop(); CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncLoginCommand(name, password, new CSyncNotification(callback, "", false) ) ); return CSyncThread::getInstance()->getRetValue(); } unsigned long rho_sync_login_c(const char *name, const char *password, /*RHOC_CALLBACK*/void* callback, void* callback_data) { rho_sync_stop(); CSyncThread::getInstance()->addQueueCommand(new CSyncThread::CSyncLoginCommand(name, password, new CSyncNotification((RHOC_CALLBACK)callback,callback_data,false)) ); return CSyncThread::getInstance()->getRetValue(); } int rho_sync_logged_in() { //CDBAdapter& db = CDBAdapter::getUserDB(); return CSyncThread::getSyncEngine().isLoggedIn() ? 1 : 0; } void rho_sync_logout() { LOG(INFO) + "Logout"; rho_sync_stop(); //CDBAdapter& db = CDBAdapter::getUserDB(); LOG(INFO) + "stopSyncByUser"; CSyncThread::getSyncEngine().stopSyncByUser(); CSyncThread::getSyncEngine().logout(); } void rho_sync_set_notification(int source_id, const char *url, char* params) { CSyncThread::getSyncEngine().getNotify().setSyncNotification(source_id, new CSyncNotification(url, params ? params : "", source_id != -1) ); } void rho_sync_set_notification_c(int source_id, /*RHOC_CALLBACK*/void* callback, void* callback_data) { CSyncThread::getSyncEngine().getNotify().setSyncNotification(source_id, new CSyncNotification((RHOC_CALLBACK)callback, callback_data, source_id != -1) ); } void rho_sync_clear_notification(int source_id) { CSyncThread::getSyncEngine().getNotify().clearSyncNotification(source_id); } #ifndef RHO_NO_RUBY unsigned long rho_sync_get_attrs(const char* szPartition, int nSrcID) { return 0;//(VALUE)CDBAdapter::getDB(szPartition).getAttrMgr().getAttrsBySrc(nSrcID); } unsigned long rho_sync_is_blob_attr(const char* szPartition, int nSrcID, const char* szAttrName) { return rho_ruby_create_boolean( CDBAdapter::getDB(szPartition).getAttrMgr().isBlobAttr(nSrcID, szAttrName) ); } #endif //RHO_NO_RUBY void rho_sync_setobjectnotify_url(const char* szUrl) { CSyncNotify::setObjectNotifyUrl(szUrl); } void rho_sync_addobjectnotify(int nSrcID, const char* szObject) { CSyncThread::getSyncEngine().getNotify().addObjectNotify(nSrcID, szObject); } void rho_sync_addobjectnotify_bysrcname(const char* szSrcName, const char* szObject) { CSyncThread::getSyncEngine().getNotify().addObjectNotify(szSrcName, szObject); } void rho_sync_cleanobjectnotify() { CSyncThread::getSyncEngine().getNotify().cleanObjectNotifications(); } int rho_sync_get_lastsync_objectcount(int nSrcID) { return CSyncThread::getSyncEngine().getNotify().getLastSyncObjectCount(nSrcID); } int rho_sync_get_pagesize() { return CSyncThread::getSyncEngine().getSyncPageSize(); } void rho_sync_set_pagesize(int nPageSize) { CSyncThread::getSyncEngine().setSyncPageSize(nPageSize); } void rho_sync_set_threaded_mode(int b) { CSyncThread::getInstance()->setNonThreadedMode(b==0); CSyncThread::getSyncEngine().setNonThreadedMode(b==0); } char* rho_sync_create_string(const char* szStr) { return strdup(szStr); } void rho_sync_free_string(char* szStr) { free(szStr); } void rho_sync_enable_status_popup(int b) { CSyncThread::getSyncEngine().getNotify().enableStatusPopup(b == 0 ? false : true); } void rho_sync_set_source_property(int nSrcID, const char* szPropName, const char* szPropValue) { CSyncEngine::getSourceOptions().setProperty(nSrcID, szPropName, szPropValue); } void rho_sync_set_ssl_verify_peer(int b) { CSyncThread::getSyncEngine().getNet().sslVerifyPeer(b == 0 ? false : true); } }