/* * rhodes * * Copyright (C) 2008 Rhomobile, Inc. All rights reserved. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ package com.rho.sync; import com.rho.RhoClassFactory; import com.rho.RhoConf; import com.rho.RhoEmptyLogger; import com.rho.RhoEmptyProfiler; import com.rho.RhoLogger; import com.rho.RhoProfiler; import com.rho.RhoRuby; import com.rho.db.*; import com.rho.net.*; import com.rho.*; import java.io.IOException; import java.util.Vector; import java.util.Hashtable; public class SyncEngine implements NetRequest.IRhoSession { private static final RhoLogger LOG = RhoLogger.RHO_STRIP_LOG ? new RhoEmptyLogger() : new RhoLogger("Sync"); private static final RhoProfiler PROF = RhoProfiler.RHO_STRIP_PROFILER ? new RhoEmptyProfiler() : new RhoProfiler(); public static final int esNone = 0, esSyncAllSources = 1, esSyncSource = 2, esStop = 3, esExit = 4; static String SYNC_SOURCE_FORMAT() { return "?format=json"; } int SYNC_VERSION() { return 2; } static String SYNC_ASK_ACTION() { return "/ask"; } static class SourceID { String m_strName = ""; String m_strUrl = ""; int m_nID; public String toString() { if ( m_strName.length() > 0 ) return "name : " + m_strName; else if ( m_strUrl.length() > 0 ) return "url : " + m_strUrl; return "# : " + m_nID; } boolean isEqual(SyncSource src) { if ( m_strName.length() > 0 ) return src.getName().equals(m_strName); else if ( m_strUrl.length() > 0 ) { URI uri1 = new URI(m_strUrl); URI uri2 = new URI(src.getUrl()); return uri1.getPath().compareTo(uri2.getPath()) == 0; //return src.getUrl().equals(m_strUrl); } return m_nID == src.getID().intValue(); } }; Vector/**/ m_sources = new Vector(); DBAdapter m_dbAdapter; NetRequest m_NetRequest; int m_syncState; String m_clientID = ""; Mutex m_mxLoadClientID = new Mutex(); String m_strSession = ""; SyncNotify m_oSyncNotify = new SyncNotify(this); boolean m_bStopByUser = false; int m_nSyncPageSize = 2000; void setState(int eState){ m_syncState = eState; } int getState(){ return m_syncState; } boolean isContinueSync(){ return m_syncState != esExit && m_syncState != esStop; } boolean isSyncing(){ return m_syncState == esSyncAllSources || m_syncState == esSyncSource; } void stopSync(){ if (isContinueSync()){ setState(esStop); m_NetRequest.cancel(); } } void stopSyncByUser(){ m_bStopByUser = true; stopSync(); } void exitSync(){ setState(esExit); m_NetRequest.cancel(); } boolean isStoppedByUser(){ return m_bStopByUser; } String getClientID(){ return m_clientID; } void setSession(String strSession){m_strSession=strSession;} public String getSession(){ return m_strSession; } boolean isSessionExist(){ return m_strSession != null && m_strSession.length() > 0; } DBAdapter getDB(){ return m_dbAdapter; } SyncNotify getNotify(){ return m_oSyncNotify; } NetRequest getNet() { return m_NetRequest;} SyncEngine(DBAdapter db){ m_dbAdapter = db; m_NetRequest = null; m_syncState = esNone; } String SYNC_PAGE_SIZE(){ return Integer.toString(m_nSyncPageSize); } int getSyncPageSize() { return m_nSyncPageSize; } void setSyncPageSize(int nPageSize){ m_nSyncPageSize = nPageSize; } void setFactory(RhoClassFactory factory)throws Exception{ m_NetRequest = RhoClassFactory.createNetRequest(); } void doSyncAllSources() { LOG.INFO("Sync all sources started."); setState(esSyncAllSources); m_bStopByUser = false; try { loadAllSources(); m_strSession = loadSession(); if ( isSessionExist() ) { m_clientID = loadClientID(); getNotify().cleanLastSyncObjectCount(); PROF.CREATE_COUNTER("Net"); PROF.CREATE_COUNTER("Parse"); PROF.CREATE_COUNTER("DB"); PROF.CREATE_COUNTER("Data"); PROF.CREATE_COUNTER("Data1"); PROF.CREATE_COUNTER("Pull"); PROF.START("Sync"); syncAllSources(); PROF.DESTROY_COUNTER("Net"); PROF.DESTROY_COUNTER("Parse"); PROF.DESTROY_COUNTER("DB"); PROF.DESTROY_COUNTER("Data"); PROF.DESTROY_COUNTER("Data1"); PROF.DESTROY_COUNTER("Pull"); PROF.STOP("Sync"); } else { if ( m_sources.size() > 0 ) { SyncSource src = (SyncSource)m_sources.elementAt(getStartSource()); //src.m_strError = "Client is not logged in. No sync will be performed."; src.m_nErrCode = RhoRuby.ERR_CLIENTISNOTLOGGEDIN; getNotify().fireSyncNotification(src, true, src.m_nErrCode, ""); }else getNotify().fireSyncNotification(null, true, RhoRuby.ERR_CLIENTISNOTLOGGEDIN, ""); } }catch(Exception exc) { LOG.ERROR("Sync failed.", exc); } if ( getState() != esExit ) setState(esNone); } void doSyncSource(SourceID oSrcID, String strParams, String strAction, boolean bSearchSyncChanges, int nProgressStep) { setState(esSyncSource); m_bStopByUser = false; SyncSource src = null; try { loadAllSources(); src = findSource(oSrcID); if ( src != null ) { LOG.INFO("Started synchronization of the data source: " + src.getName() ); src.m_strParams = strParams; src.m_strAction = strAction; src.m_bSearchSyncChanges = bSearchSyncChanges; src.m_nProgressStep = nProgressStep; if ( oSrcID.m_strUrl.length() != 0 ) { try{ URI uri = new URI(oSrcID.m_strUrl); src.setUrlParams(uri.getQueryString()); if (uri.getScheme()!= null && uri.getScheme().length()>0) src.setUrl(uri.getPathSpecificPart()); }catch(Exception exc) { LOG.ERROR("Malformed url when sync by url.", exc); } } m_strSession = loadSession(); if ( isSessionExist() ) { m_clientID = loadClientID(); if ( getState() != esStop ) { getNotify().cleanLastSyncObjectCount(); PROF.CREATE_COUNTER("Net"); PROF.CREATE_COUNTER("Parse"); PROF.CREATE_COUNTER("DB"); PROF.CREATE_COUNTER("Data"); PROF.CREATE_COUNTER("Data1"); PROF.CREATE_COUNTER("Pull"); PROF.START("Sync"); src.sync(); PROF.DESTROY_COUNTER("Net"); PROF.DESTROY_COUNTER("Parse"); PROF.DESTROY_COUNTER("DB"); PROF.DESTROY_COUNTER("Data"); PROF.DESTROY_COUNTER("Data1"); PROF.DESTROY_COUNTER("Pull"); PROF.STOP("Sync"); } } else { //src.m_strError = "Client is not logged in. No sync will be performed."; src.m_nErrCode = RhoRuby.ERR_CLIENTISNOTLOGGEDIN; } getNotify().fireSyncNotification(src, true, src.m_nErrCode, src.m_nErrCode == RhoRuby.ERR_NONE ? RhoRuby.getMessageText("sync_completed") : ""); } else { src = new SyncSource(this); //src.m_strError = "Unknown sync source."; src.m_nErrCode = RhoRuby.ERR_RUNTIME; throw new RuntimeException("Sync one source : Unknown Source " + oSrcID.toString() ); } } catch(Exception exc) { LOG.ERROR("Sync source " + oSrcID.toString() + " failed.", exc); if ( src != null && src.m_nErrCode == RhoRuby.ERR_NONE ) src.m_nErrCode = RhoRuby.ERR_RUNTIME; getNotify().fireSyncNotification(src, true, src.m_nErrCode, "" ); } getNotify().cleanCreateObjectErrors(); if ( getState() != esExit ) setState(esNone); } SyncSource findSource(SourceID oSrcID) { for( int i = 0; i < (int)m_sources.size(); i++ ) { SyncSource src = (SyncSource)m_sources.elementAt(i); if ( oSrcID.isEqual(src) ) return src; } return null; } SyncSource findSourceByName(String strSrcName) { SourceID oSrcID = new SourceID(); oSrcID.m_strName = strSrcName; return findSource(oSrcID); } void loadAllSources()throws DBException { m_sources.removeAllElements(); IDBResult res = getDB().executeSQL("SELECT source_id,source_url,token,name from sources ORDER BY priority"); for ( ; !res.isEnd(); res.next() ) { String strDbUrl = res.getStringByIdx(1); if ( strDbUrl.length() == 0 ) continue; String strUrl = strDbUrl.startsWith("http") ? strDbUrl : FilePath.join(RhoConf.getInstance().getPath("syncserver"), strDbUrl); if ( strUrl.charAt(strUrl.length()-1) == '/' || strUrl.charAt(strUrl.length()-1) == '\\' ) strUrl = strUrl.substring(0, strUrl.length()-1); String name = res.getStringByIdx(3); if ( strUrl.length() > 0 ) m_sources.addElement( new SyncSource( res.getIntByIdx(0), strUrl, name, res.getLongByIdx(2), this) ); } } public String loadClientID()throws Exception { String clientID = ""; synchronized( m_mxLoadClientID ) { boolean bResetClient = false; IDBResult res = getDB().executeSQL("SELECT client_id,reset from client_info"); if ( !res.isEnd() ) { clientID = res.getStringByIdx(0); bResetClient = res.getIntByIdx(1) > 0; } if ( clientID.length() == 0 ) { clientID = requestClientIDByNet(); getDB().executeSQL("DELETE FROM client_info"); getDB().executeSQL("INSERT INTO client_info (client_id) values (?)", clientID); }else if ( bResetClient ) { if ( !resetClientIDByNet(clientID) ) { if ( m_sources.size() > 0 ) { SyncSource src = (SyncSource)m_sources.elementAt(getStartSource()); src.m_nErrCode = RhoRuby.ERR_REMOTESERVER; getNotify().fireSyncNotification(src, true, src.m_nErrCode, ""); }else getNotify().fireSyncNotification(null, true, RhoRuby.ERR_REMOTESERVER, ""); stopSync(); } else getDB().executeSQL("UPDATE client_info SET reset=? where client_id=?", new Integer(0), clientID ); } } return clientID; } boolean resetClientIDByNet(String strClientID)throws Exception { String serverUrl = RhoConf.getInstance().getPath("syncserver"); String strUrl = serverUrl + "clientreset"; String strQuery = "?client_id=" + strClientID; strQuery += "&" + ClientRegister.getInstance().getRegisterBody(); NetResponse resp = getNet().pullData(strUrl+strQuery, this); return resp.isOK(); } String requestClientIDByNet()throws Exception { LOG.INFO("Request clientID by Net."); String serverUrl = RhoConf.getInstance().getPath("syncserver"); String strUrl = serverUrl + "clientcreate"; String strQuery = SYNC_SOURCE_FORMAT(); strQuery += "&" + ClientRegister.getInstance().getRegisterBody(); NetResponse resp = getNet().pullData(strUrl+strQuery, this); if ( resp.isOK() && resp.getCharData() != null ) { String szData = resp.getCharData(); JSONEntry oJsonEntry = new JSONEntry(szData); JSONEntry oJsonObject = oJsonEntry.getEntry("client"); if ( !oJsonObject.isEmpty() ) return oJsonObject.getString("client_id"); } return ""; } int getStartSource() { for( int i = 0; i < m_sources.size(); i++ ) { SyncSource src = (SyncSource)m_sources.elementAt(i); if ( !src.isEmptyToken() ) return i; } return 0; } void syncAllSources()throws Exception { boolean bError = false; for( int i = getStartSource(); i < m_sources.size() && isContinueSync(); i++ ) { SyncSource src = null; try{ src = (SyncSource)m_sources.elementAt(i); if ( isSessionExist() && getState() != esStop ) src.sync(); }catch(Exception exc) { if ( src.m_nErrCode == RhoRuby.ERR_NONE ) src.m_nErrCode = RhoRuby.ERR_RUNTIME; setState(esStop); throw exc; }finally{ getNotify().onSyncSourceEnd( i, m_sources ); bError = src.m_nErrCode != RhoRuby.ERR_NONE; } } if ( !bError) getNotify().fireSyncNotification(null, true, RhoRuby.ERR_NONE, RhoRuby.getMessageText("sync_completed")); } void callLoginCallback(String callback, int nErrCode, String strMessage) { try{ String strBody = "error_code=" + nErrCode; strBody += "&error_message=" + URI.urlEncode(strMessage != null? strMessage : ""); strBody += "&rho_callback=1"; String strUrl = getNet().resolveUrl(callback); LOG.INFO( "Login callback: " + callback + ". Body: "+ strBody ); NetResponse resp = getNet().pushData( strUrl, strBody, this ); if ( !resp.isOK() ) LOG.ERROR( "Call Login callback failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); }catch(Exception exc) { LOG.ERROR("Call Login callback failed.", exc); } } boolean checkAllSourcesFromOneDomain()throws Exception { loadAllSources(); if ( m_sources.size() == 0 ) return true; //All sources should be from one domain SyncSource src0 = (SyncSource)m_sources.elementAt(0); String srv0 = getServerFromUrl(src0.getUrl()); for( int i = 1; i < m_sources.size(); i++ ) { SyncSource src = (SyncSource)m_sources.elementAt(i); String srv = getServerFromUrl(src.getUrl()); if ( srv.equals( srv0 ) != true ) return false; } return true; } void login(String name, String password, String callback) { try { if ( !checkAllSourcesFromOneDomain() ) { callLoginCallback(callback, RhoRuby.ERR_DIFFDOMAINSINSYNCSRC, ""); return; } NetResponse resp = null; try{ String serverUrl = RhoConf.getInstance().getPath("syncserver"); String strBody = "login=" + name + "&password=" + password + "&remember_me=1"; resp = getNet().pullCookies( serverUrl+"client_login", strBody, this); if ( resp.isUnathorized() ) { callLoginCallback(callback, RhoRuby.ERR_UNATHORIZED, resp.getCharData()); return; } if ( !resp.isOK() ) { callLoginCallback(callback, RhoRuby.ERR_REMOTESERVER, resp.getCharData()); return; } }catch(IOException exc) { LOG.ERROR("Login failed.", exc); callLoginCallback(callback, RhoRuby.getNetErrorCode(exc), "" ); return; } String strSession = resp.getCharData(); if ( strSession == null || strSession.length() == 0 ) { LOG.ERROR("Return empty session."); callLoginCallback(callback, RhoRuby.ERR_UNEXPECTEDSERVERRESPONSE, "" ); return; } //TODO: move session to client_info table getDB().executeSQL( "UPDATE sources SET session=?", strSession ); //if ( ClientRegister.getInstance() != null ) // ClientRegister.getInstance().stopWait(); callLoginCallback(callback, RhoRuby.ERR_NONE, "" ); }catch(Exception exc) { LOG.ERROR("Login failed.", exc); callLoginCallback(callback, RhoRuby.ERR_RUNTIME, "" ); } } boolean isLoggedIn()throws DBException { int nCount = 0; IDBResult res = getDB().executeSQL("SELECT count(session) FROM sources WHERE session IS NOT NULL"); if ( !res.isEnd() ) nCount = res.getIntByIdx(0); return nCount > 0; } String loadSession()throws DBException { m_strSession = ""; IDBResult res = getDB().executeSQL("SELECT session FROM sources WHERE session IS NOT NULL"); if ( !res.isEnd() ) m_strSession = res.getStringByIdx(0); return m_strSession; } public void logout()throws Exception { getDB().executeSQL( "UPDATE sources SET session = NULL"); m_strSession = ""; loadAllSources(); } public void setSyncServer(String url)throws Exception { RhoConf.getInstance().setPropertyByName("syncserver", url); RhoConf.getInstance().saveToFile(); RhoConf.getInstance().loadConf(); getDB().executeSQL("DELETE FROM client_info"); logout(); } static String getServerFromUrl( String strUrl ) { URI uri = new URI(strUrl); return uri.getHost(); } }