/* * rhodes * * Copyright (C) 2008 Rhomobile, Inc. All rights reserved. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ package com.rho.sync; import com.rho.RhoClassFactory; import com.rho.RhoEmptyLogger; import com.rho.RhoEmptyProfiler; import com.rho.RhoLogger; import com.rho.RhoProfiler; import com.rho.RhodesApp; import com.rho.Tokenizer; import com.rho.net.*; import com.rho.db.*; import java.util.Enumeration; import java.util.Vector; import java.util.Hashtable; import com.rho.FilePath; import com.rho.TimeInterval; import com.rho.RhoAppAdapter; import com.rho.net.NetRequest.MultipartItem; public class SyncSource { private static final RhoLogger LOG = RhoLogger.RHO_STRIP_LOG ? new RhoEmptyLogger() : new RhoLogger("Sync"); private static final RhoProfiler PROF = RhoProfiler.RHO_STRIP_PROFILER ? new RhoEmptyProfiler() : new RhoProfiler(); static class CAttrValue { String m_strAttrib = ""; String m_strValue = ""; String m_strBlobSuffix = ""; CAttrValue(String strAttrib, String strValue) { m_strAttrib = strAttrib; m_strValue = strValue; if ( m_strAttrib.endsWith("-rhoblob") ) { m_strBlobSuffix = "-rhoblob"; m_strAttrib = m_strAttrib.substring(0,m_strAttrib.length()-m_strBlobSuffix.length()); } } }; SyncEngine m_syncEngine; DBAdapter m_dbAdapter; Integer m_nID; String m_strName = ""; long m_token = 0; String m_strSyncType = ""; boolean m_bTokenFromDB; int m_nCurPageCount, m_nInserted, m_nDeleted, m_nTotalCount, m_nAttribCounter=0; boolean m_bGetAtLeastOnePage = false; int m_nErrCode = RhoAppAdapter.ERR_NONE; String m_strError = ""; String m_strErrorType = ""; //String m_strPushBody = ""; Vector/*Ptr*/ m_arSyncBlobs = new Vector(); int m_nRefreshTime = 0; int m_nProgressStep = -1; boolean m_bSchemaSource; static class CAssociation { String m_strSrcName, m_strAttrib; CAssociation( String strSrcName, String strAttrib ){m_strSrcName = strSrcName; m_strAttrib = strAttrib; } }; Vector/**/ m_arAssociations = new Vector(); Vector/*Ptr*/ m_arMultipartItems = new Vector(); Vector/**/ m_arBlobAttrs = new Vector(); Integer getID() { return m_nID; } String getName() { return m_strName; } String getSyncType(){ return m_strSyncType; } int getServerObjectsCount(){ return m_nInserted+m_nDeleted; } long getToken(){ return m_token; } boolean isTokenFromDB(){ return m_bTokenFromDB; } void setToken(long token){ m_token = token; m_bTokenFromDB = false; } boolean isEmptyToken() { return m_token == 0; } int getProgressStep(){ return m_nProgressStep; } void setProgressStep(int nProgressStep){ m_nProgressStep = nProgressStep; } boolean getGetAtLeastOnePage(){ return m_bGetAtLeastOnePage; } int getRefreshTime(){ return m_nRefreshTime; } Vector/**/ getAssociations(){ return m_arAssociations; } int getInsertedCount() { return m_nInserted; } int getDeletedCount() { return m_nDeleted; } void setCurPageCount(int nCurPageCount){m_nCurPageCount = nCurPageCount;} void setTotalCount(int nTotalCount){m_nTotalCount = nTotalCount;} int getCurPageCount(){return m_nCurPageCount;} int getTotalCount(){return m_nTotalCount;} SyncEngine getSync(){ return m_syncEngine; } SyncNotify getNotify(){ return getSync().getNotify(); } NetRequest getNet(){ return getSync().getNet(); } ISyncProtocol getProtocol(){ return getSync().getProtocol(); } void setRefreshTime( int nRefreshTime ){ m_nRefreshTime = nRefreshTime;} DBAdapter getDB(){ return m_dbAdapter; } SyncSource(SyncEngine syncEngine, DBAdapter db)throws DBException { m_syncEngine = syncEngine; m_dbAdapter = db; m_nID = new Integer(0); m_bTokenFromDB = true; m_nCurPageCount = 0; m_nInserted = 0; m_nDeleted = 0; m_nTotalCount = 0; m_bGetAtLeastOnePage = false; m_nErrCode = RhoAppAdapter.ERR_NONE; m_bSchemaSource = db.isTableExist(m_strName); } SyncSource(int id, String name, String strSyncType, DBAdapter db, SyncEngine syncEngine )throws DBException { m_syncEngine = syncEngine; m_dbAdapter = db; m_nID = new Integer(id); m_strName = name; m_strSyncType = strSyncType; m_nCurPageCount = 0; m_nInserted = 0; m_nDeleted = 0; m_nTotalCount = 0; m_bGetAtLeastOnePage = false; m_nErrCode = RhoAppAdapter.ERR_NONE; IDBResult res = db.executeSQL("SELECT token,associations from sources WHERE source_id=?", m_nID); if ( !res.isOneEnd() ) { m_token = res.getLongByIdx(0); m_bTokenFromDB = true; }else { m_token = 0; m_bTokenFromDB = true; } m_bSchemaSource = db.isTableExist(m_strName); parseAssociations(res.getStringByIdx(1)); } void parseAssociations(String strAssociations) { if (strAssociations.length() == 0 ) return; Tokenizer oTokenizer = new Tokenizer( strAssociations, "," ); String strSrcName = ""; while (oTokenizer.hasMoreTokens()) { String tok = oTokenizer.nextToken(); if (tok.length() == 0) continue; if ( strSrcName.length() > 0 ) { m_arAssociations.addElement( new CAssociation(strSrcName, tok) ); strSrcName = ""; }else strSrcName = tok; } } void sync() throws Exception { getNotify().reportSyncStatus(RhoAppAdapter.getMessageText("syncronizing") + getName() + "...", m_nErrCode, m_strError ); TimeInterval startTime = TimeInterval.getCurrentTime(); //m_bIsSearch = false; try{ if ( isTokenFromDB() && getToken() > 1 ) syncServerChanges(); //sync only server changes, which was paused before else { if ( isEmptyToken() ) processToken(1); boolean bSyncedServer = syncClientChanges(); if ( !bSyncedServer ) syncServerChanges(); } }catch(Exception exc) { getSync().stopSync(); throw exc; }finally{ TimeInterval endTime = TimeInterval.getCurrentTime(); getDB().executeSQL("UPDATE sources set last_updated=?,last_inserted_size=?,last_deleted_size=?, "+ "last_sync_duration=?,last_sync_success=?, backend_refresh_time=? WHERE source_id=?", new Long(endTime.toULong()/1000), new Integer(getInsertedCount()), new Integer(getDeletedCount()), new Long((endTime.minus(startTime)).toULong()), new Integer(m_bGetAtLeastOnePage?1:0), new Integer(m_nRefreshTime), getID() ); } } boolean syncClientChanges()throws Exception { boolean bSyncedServer = false; if ( isPendingClientChanges() ) { LOG.INFO( "Client has unconfirmed created items. Call server to update them." ); syncServerChanges(); bSyncedServer = true; } if ( bSyncedServer && isPendingClientChanges() ) { LOG.INFO( "Server does not sent created items. Stop sync." ); getSync().setState(SyncEngine.esStop); } else { PROF.START("Pull"); boolean bSyncClient = false; { IDBResult res = getDB().executeSQL("SELECT object FROM changed_values WHERE source_id=? LIMIT 1 OFFSET 0", getID()); bSyncClient = !res.isOneEnd(); } if ( bSyncClient ) { doSyncClientChanges(); bSyncedServer = false; } PROF.STOP("Pull"); } return bSyncedServer; } boolean isPendingClientChanges()throws DBException { IDBResult res = getDB().executeSQL("SELECT object FROM changed_values WHERE source_id=? and update_type='create' and sent>1 LIMIT 1 OFFSET 0", getID()); return !res.isOneEnd(); } void doSyncClientChanges()throws Exception { String arUpdateTypes[] = {"create", "update", "delete"}; boolean arUpdateSent[] = {false, false, false}; m_arMultipartItems.removeAllElements(); m_arBlobAttrs.removeAllElements(); String strBody = "{\"source_name\":" + JSONEntry.quoteValue(getName()) + ",\"client_id\":" + JSONEntry.quoteValue(getSync().getClientID()); boolean bSend = false; int i = 0; for( i = 0; i < 3 && getSync().isContinueSync(); i++ ) { String strBody1; strBody1 = makePushBody_Ver3(arUpdateTypes[i], true); if (strBody1.length() > 0) { strBody += "," + strBody1; String strBlobAttrs = ""; for ( int j = 0; j < (int)m_arBlobAttrs.size(); j++) { if ( strBlobAttrs.length() > 0 ) strBlobAttrs += ","; strBlobAttrs += JSONEntry.quoteValue((String)m_arBlobAttrs.elementAt(j)); } if ( strBlobAttrs.length() > 0 ) strBody += ",\"blob_fields\":[" + strBlobAttrs + "]"; arUpdateSent[i] = true; bSend = true; } } strBody += "}"; if ( bSend ) { LOG.INFO( "Push client changes to server. Source: " + getName() + "Size :" + strBody.length() ); LOG.TRACE("Push body: " + strBody); try{ if ( m_arMultipartItems.size() > 0 ) { MultipartItem oItem = new MultipartItem(); oItem.m_strBody = strBody; //oItem.m_strContentType = getProtocol().getContentType(); oItem.m_strName = "cud"; m_arMultipartItems.addElement(oItem); NetResponse resp = getNet().pushMultipartData( getProtocol().getClientChangesUrl(), m_arMultipartItems, getSync(), null ); if ( !resp.isOK() ) { getSync().setState(SyncEngine.esStop); m_nErrCode = RhoAppAdapter.ERR_REMOTESERVER; m_strError = resp.getCharData(); } }else { NetResponse resp = getNet().pushData( getProtocol().getClientChangesUrl(), strBody, getSync()); if ( !resp.isOK() ) { getSync().setState(SyncEngine.esStop); m_nErrCode = RhoAppAdapter.ERR_REMOTESERVER; m_strError = resp.getCharData(); } } }catch(Exception exc) { m_nErrCode = RhoAppAdapter.getNetErrorCode(exc); throw exc; } } for( i = 0; i < 3 && getSync().isContinueSync(); i++ ) { if ( arUpdateSent[i] ) { //oo conflicts if ( i < 1 ) //create getDB().executeSQL("UPDATE changed_values SET sent=2 WHERE source_id=? and update_type=? and sent=1", getID(), arUpdateTypes[i] ); else // getDB().executeSQL("DELETE FROM changed_values WHERE source_id=? and update_type=? and sent=1", getID(), arUpdateTypes[i] ); } } m_arMultipartItems.removeAllElements(); m_arBlobAttrs.removeAllElements(); } //{"source_name":"SampleAdapter","client_id":1,"create":{"1":{"brand":"Apple","name":"iPhone","price":"199.99"}}} //{"source_name":"SampleAdapter","client_id":1,"update":{"1":{"brand":"Apple","name":"iPhone","price":"199.99"}}} //{"source_name":"SampleAdapter","client_id":1,"delete":{"1":{"brand":"Apple","name":"iPhone","price":"199.99"}}} //{"source_name":"SampleAdapter","client_id":1,"delete":{"3":{"brand":"HTC","name":"Fuze","price":"299.99"}},"create":{"1":{"brand":"Apple","name":"iPhone","price":"199.99"}},"update":{"2":{"brand":"Android","name":"G2","price":"99.99"}}} String makePushBody_Ver3( String strUpdateType, boolean isSync)throws DBException { String strBody = ""; getDB().Lock(); if ( isSync ) getDB().updateAllAttribChanges(); IDBResult res = getDB().executeSQL("SELECT attrib, object, value, attrib_type "+ "FROM changed_values where source_id=? and update_type =? and sent<=1 ORDER BY object", getID(), strUpdateType ); if ( res.isEnd() ) { res.close(); getDB().Unlock(); return strBody; } String strCurObject = ""; boolean bFirst = true; for( ; !res.isEnd(); res.next() ) { String strAttrib = res.getStringByIdx(0); String strObject = res.getStringByIdx(1); String value = res.getStringByIdx(2); String attribType = res.getStringByIdx(3); if ( attribType.compareTo("blob.file") == 0 ) { MultipartItem oItem = new MultipartItem(); oItem.m_strFilePath = RhodesApp.getInstance().resolveDBFilesPath(value); oItem.m_strContentType = "application/octet-stream"; oItem.m_strName = strAttrib + "-" + strObject; m_arBlobAttrs.addElement(strAttrib); m_arMultipartItems.addElement(oItem); } if ( strBody.length() == 0 ) { if ( !isSync ) strBody += "{"; else strBody += "\"" + strUpdateType + "\":{"; } if ( strObject.compareTo(strCurObject) != 0 ) { if ( strCurObject.length() > 0 ) { if ( !bFirst ) strBody += "}"; strBody += ","; } bFirst = true; strBody += JSONEntry.quoteValue(strObject); strCurObject = strObject; } if (!bFirst) strBody += ","; if ( strAttrib.length() > 0 ) { if ( bFirst ) strBody += ":{"; strBody += JSONEntry.quoteValue(strAttrib) + ":" + JSONEntry.quoteValue(value); bFirst = false; } } if ( strBody.length() > 0 ) { if ( !bFirst ) strBody += "}"; strBody += "}"; } if ( isSync ) getDB().executeSQL("UPDATE changed_values SET sent=1 WHERE source_id=? and update_type=? and sent=0", getID(), strUpdateType ); getDB().Unlock(); return strBody; } void applyChangedValues()throws Exception { String strBody = makePushBody_Ver3("create", false); if ( strBody != null && strBody.length() > 0 ) { JSONEntry oEntry = new JSONEntry(strBody); processSyncCommand("insert", oEntry ); } strBody = makePushBody_Ver3("delete", false); if ( strBody != null && strBody.length() > 0 ) { JSONEntry oEntry = new JSONEntry(strBody); processSyncCommand("delete", oEntry ); } strBody = makePushBody_Ver3("update", false); if ( strBody != null && strBody.length() > 0 ) { JSONEntry oEntry = new JSONEntry(strBody); processSyncCommand("insert", oEntry ); } } void syncServerChanges()throws Exception { LOG.INFO("Sync server changes source ID :" + getID() ); while( getSync().isContinueSync() ) { setCurPageCount(0); String strUrl = getProtocol().getServerQueryUrl(""); String strQuery = getProtocol().getServerQueryBody(getName(), getSync().getClientID(), getSync().getSyncPageSize()); if ( !m_bTokenFromDB && getToken() > 1 ) strQuery += "&token=" + getToken(); LOG.INFO( "Pull changes from server. Url: " + (strUrl+strQuery) ); NetResponse resp = null; try{ PROF.START("Net"); resp = getNet().pullData(strUrl+strQuery, getSync()); PROF.STOP("Net"); if ( !resp.isOK() ) { getSync().stopSync(); m_nErrCode = RhoAppAdapter.getErrorFromResponse(resp); m_strError = resp.getCharData(); continue; } }catch(Exception exc) { m_nErrCode = RhoAppAdapter.getNetErrorCode(exc); throw exc; } String szData = resp.getCharData(); //String szData = "[{\"version\":3},{\"token\":\"\"},{\"count\":0},{\"progress_count\":28},{\"total_count\":28},{\"source-error\":{\"login-error\":{\"message\":\"s currently connected from another machine\"}}}]"; //String szData = "[{\"version\":3},{\"token\":\"\"},{\"count\":0},{\"progress_count\":0},{\"total_count\":0},{\"create-error\":{\"0_broken_object_id\":{\"name\":\"wrongname\",\"an_attribute\":\"error create\"},\"0_broken_object_id-error\":{\"message\":\"error create\"}}}]"; //String szData = "[{\"version\":3},{\"token\":\"35639160294387\"},{\"count\":3},{\"progress_count\":0},{\"total_count\":3},{\"metadata\":\"{\\\"foo\\\":\\\"bar\\\"}\",\"insert\":{\"1\":{\"price\":\"199.99\",\"brand\":\"Apple\",\"name\":\"iPhone\"}}}]"; //String szData = "[{\"version\":3},{\"token\":\"\"},{\"count\":0},{\"progress_count\":1},{\"total_count\":1},{\"update-error\":{\"1-error\":{\"message\":\"Update failed!\"},\"1\":{\"foo\":\"bar5\"}}}]"; PROF.START("Parse"); JSONArrayIterator oJsonArr = new JSONArrayIterator(szData); PROF.STOP("Parse"); processServerResponse_ver3(oJsonArr); if (getSync().getSourceOptions().getBoolProperty(getID(), "pass_through")) processToken(0); if ( getToken() == 0 ) break; } if ( getSync().isSchemaChanged() ) getSync().stopSync(); } boolean processServerErrors(JSONEntry oCmds)throws Exception { if ( oCmds.hasName("source-error") ) { JSONEntry errSrc = oCmds.getEntry("source-error"); JSONStructIterator errIter = new JSONStructIterator(errSrc); for( ; !errIter.isEnd(); errIter.next() ) { m_nErrCode = RhoAppAdapter.ERR_CUSTOMSYNCSERVER; m_strError = errIter.getCurValue().getString("message"); m_strErrorType = errIter.getCurKey(); } }else if ( oCmds.hasName("search-error") ) { JSONEntry errSrc = oCmds.getEntry("search-error"); JSONStructIterator errIter = new JSONStructIterator(errSrc); for( ; !errIter.isEnd(); errIter.next() ) { m_nErrCode = RhoAppAdapter.ERR_CUSTOMSYNCSERVER; m_strError = errIter.getCurValue().getString("message"); m_strErrorType = errIter.getCurKey(); } }else if ( oCmds.hasName("create-error") ) { m_nErrCode = RhoAppAdapter.ERR_CUSTOMSYNCSERVER; m_strErrorType = "create-error"; JSONEntry errSrc = oCmds.getEntry(m_strErrorType); JSONStructIterator errIter = new JSONStructIterator(errSrc); for( ; !errIter.isEnd(); errIter.next() ) { String strKey = errIter.getCurKey(); if ( strKey.endsWith("-error") ) m_strError = errIter.getCurValue().getString("message"); } }else if ( oCmds.hasName("update-error") ) { m_nErrCode = RhoAppAdapter.ERR_CUSTOMSYNCSERVER; m_strErrorType = "update-error"; JSONEntry errSrc = oCmds.getEntry(m_strErrorType); JSONStructIterator errIter = new JSONStructIterator(errSrc); for( ; !errIter.isEnd(); errIter.next() ) { String strKey = errIter.getCurKey(); if ( strKey.endsWith("-error") ) m_strError = errIter.getCurValue().getString("message"); } }else if ( oCmds.hasName("delete-error") ) { m_nErrCode = RhoAppAdapter.ERR_CUSTOMSYNCSERVER; m_strErrorType = "delete-error"; JSONEntry errSrc = oCmds.getEntry(m_strErrorType); JSONStructIterator errIter=new JSONStructIterator(errSrc); for( ; !errIter.isEnd(); errIter.next() ) { String strKey = errIter.getCurKey(); if ( strKey.endsWith( "-error") ) m_strError = errIter.getCurValue().getString("message"); } }else return false; return true; } void processServerResponse_ver3(JSONArrayIterator oJsonArr)throws Exception { PROF.START("Data1"); int nVersion = 0; if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("version") ) { nVersion = oJsonArr.getCurItem().getInt("version"); oJsonArr.next(); } if ( nVersion != getProtocol().getVersion() ) { LOG.ERROR("Sync server send data with incompatible version. Client version: " + getProtocol().getVersion() + "; Server response version: " + nVersion + ". Source name: " + getName() ); getSync().stopSync(); m_nErrCode = RhoAppAdapter.ERR_UNEXPECTEDSERVERRESPONSE; return; } if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("token")) { processToken(oJsonArr.getCurItem().getUInt64("token")); oJsonArr.next(); } if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("source") ) { //skip it. it uses in search only oJsonArr.next(); } if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("count") ) { setCurPageCount(oJsonArr.getCurItem().getInt("count")); oJsonArr.next(); } if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("refresh_time") ) { setRefreshTime(oJsonArr.getCurItem().getInt("refresh_time")); oJsonArr.next(); } if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("progress_count") ) { //TODO: progress_count //setTotalCount(oJsonArr.getCurItem().getInt("progress_count")); oJsonArr.next(); } if ( !oJsonArr.isEnd() && oJsonArr.getCurItem().hasName("total_count") ) { setTotalCount(oJsonArr.getCurItem().getInt("total_count")); oJsonArr.next(); } //if ( getServerObjectsCount() == 0 ) // getNotify().fireSyncNotification(this, false, RhoAppAdapter.ERR_NONE, ""); if ( getToken() == 0 ) { //oo conflicts getDB().executeSQL("DELETE FROM changed_values where source_id=? and sent>=3", getID() ); // } LOG.INFO("Got " + getCurPageCount() + "(Processed: " + getServerObjectsCount() + ") records of " + getTotalCount() + " from server. Source: " + getName() + ". Version: " + nVersion ); PROF.STOP("Data1"); if ( !oJsonArr.isEnd() && getSync().isContinueSync() ) { JSONEntry oCmds = oJsonArr.getCurItem(); PROF.START("Data"); if ( oCmds.hasName("schema-changed") ) { getSync().setSchemaChanged(true); }else if ( !processServerErrors(oCmds) ) { getDB().startTransaction(); if (getSync().getSourceOptions().getBoolProperty(getID(), "pass_through")) { if ( m_bSchemaSource ) getDB().executeSQL( "DELETE FROM " + getName() ); else getDB().executeSQL( "DELETE FROM object_values WHERE source_id=?", getID() ); } if ( oCmds.hasName("metadata") && getSync().isContinueSync() ) { String strMetadata = oCmds.getString("metadata"); getDB().executeSQL("UPDATE sources SET metadata=? WHERE source_id=?", strMetadata, getID() ); } if ( oCmds.hasName("links") && getSync().isContinueSync() ) processSyncCommand("links", oCmds.getEntry("links") ); if ( oCmds.hasName("delete") && getSync().isContinueSync() ) processSyncCommand("delete", oCmds.getEntry("delete") ); if ( oCmds.hasName("insert") && getSync().isContinueSync() ) processSyncCommand("insert", oCmds.getEntry("insert") ); PROF.STOP("Data"); PROF.START("DB"); getDB().endTransaction(); PROF.STOP("DB"); getNotify().fireObjectsNotification(); } } PROF.START("Data1"); if ( getCurPageCount() > 0 ) getNotify().fireSyncNotification(this, false, RhoAppAdapter.ERR_NONE, ""); PROF.STOP("Data1"); } void processSyncCommand(String strCmd, JSONEntry oCmdEntry)throws Exception { JSONStructIterator objIter = new JSONStructIterator(oCmdEntry); for( ; !objIter.isEnd() && getSync().isContinueSync(); objIter.next() ) { String strObject = objIter.getCurKey(); JSONStructIterator attrIter = new JSONStructIterator( objIter.getCurValue() ); try { if ( m_bSchemaSource ) processServerCmd_Ver3_Schema(strCmd,strObject,attrIter); else { for( ; !attrIter.isEnd() && getSync().isContinueSync(); attrIter.next() ) { String strAttrib = attrIter.getCurKey(); String strValue = attrIter.getCurString(); processServerCmd_Ver3(strCmd,strObject,strAttrib,strValue); } } }catch(DBException exc) { LOG.ERROR("Sync of server changes failed for " + getName() + ";object: " + strObject, exc); } if ( getSyncType().compareTo("none") == 0 ) continue; int nSyncObjectCount = getNotify().incLastSyncObjectCount(getID()); if ( getProgressStep() > 0 && (nSyncObjectCount%getProgressStep() == 0) ) getNotify().fireSyncNotification(this, false, RhoAppAdapter.ERR_NONE, ""); if ( getDB().isUIWaitDB() ) { LOG.INFO("Commit transaction because of UI request."); getDB().endTransaction(); SyncThread.getInstance().sleep(1000); getDB().startTransaction(); } } } void processAssociations(String strOldObject, String strNewObject)throws Exception { for ( int i = 0; i < m_arAssociations.size(); i++ ) { SyncSource pSrc = getSync().findSourceByName( ((CAssociation)m_arAssociations.elementAt(i)).m_strSrcName); if ( pSrc != null ) pSrc.updateAssociation(strOldObject, strNewObject, ((CAssociation)m_arAssociations.elementAt(i)).m_strAttrib); } } void updateAssociation(String strOldObject, String strNewObject, String strAttrib)throws Exception { if ( m_bSchemaSource ) { String strSqlUpdate = "UPDATE "; strSqlUpdate += getName() + " SET " + strAttrib + "=? where " + strAttrib + "=?"; getDB().executeSQL(strSqlUpdate, strNewObject, strOldObject ); } else getDB().executeSQL("UPDATE object_values SET value=? where attrib=? and source_id=? and value=?", strNewObject, strAttrib, getID(), strOldObject ); getDB().executeSQL("UPDATE changed_values SET value=? where attrib=? and source_id=? and value=?", strNewObject, strAttrib, getID(), strOldObject ); } void processServerCmd_Ver3_Schema(String strCmd, String strObject, JSONStructIterator attrIter)throws Exception { if ( strCmd.compareTo("insert") == 0 ) { Vector/**/ vecValues = new Vector(), vecAttrs = new Vector(); String strCols = "", strQuest = "", strSet = ""; for( ; !attrIter.isEnd() && getSync().isContinueSync(); attrIter.next() ) { CAttrValue oAttrValue = new CAttrValue(attrIter.getCurKey(),attrIter.getCurString()); if ( !processBlob(strCmd,strObject,oAttrValue) ) continue; if ( strCols.length() > 0 ) strCols += ","; if ( strQuest.length() > 0) strQuest += ","; if ( strSet.length() > 0) strSet += ","; strCols += oAttrValue.m_strAttrib; strQuest += "?"; strSet += oAttrValue.m_strAttrib + "=?"; vecAttrs.addElement(oAttrValue.m_strAttrib); vecValues.addElement(oAttrValue.m_strValue); } vecValues.addElement(strObject); if ( strCols.length() > 0 ) strCols += ","; if ( strQuest.length() > 0) strQuest += ","; strCols += "object"; strQuest += "?"; String strSqlInsert = "INSERT INTO "; strSqlInsert += getName() + " ("; strSqlInsert += strCols + ") VALUES(" + strQuest + ")"; if ( !getSync().isContinueSync() ) return; IDBResult resInsert = getDB().executeSQLReportNonUniqueEx(strSqlInsert, vecValues ); if ( resInsert.isNonUnique() ) { String strSqlUpdate = "UPDATE "; strSqlUpdate += getName() + " SET " + strSet + " WHERE object=?"; getDB().executeSQLEx(strSqlUpdate, vecValues); if ( getSyncType().compareTo("none") != 0 ) { // oo conflicts for( int i = 0; i < (int)vecAttrs.size(); i++ ) { getDB().executeSQL("UPDATE changed_values SET sent=4 where object=? and attrib=? and source_id=? and sent>1", strObject, vecAttrs.elementAt(i), getID() ); } // } } if ( getSyncType().compareTo("none") != 0 ) getNotify().onObjectChanged(getID(),strObject, SyncNotify.enUpdate); m_nInserted++; }else if (strCmd.compareTo("delete") == 0) { Vector/**/ vecAttrs = new Vector(); String strSet = ""; for( ; !attrIter.isEnd() && getSync().isContinueSync(); attrIter.next() ) { CAttrValue oAttrValue = new CAttrValue(attrIter.getCurKey(),attrIter.getCurString()); if ( strSet.length() > 0 ) strSet += ","; vecAttrs.addElement(oAttrValue.m_strAttrib); strSet += oAttrValue.m_strAttrib + "=NULL"; } String strSqlUpdate = "UPDATE "; strSqlUpdate += getName() + " SET " + strSet + " WHERE object=?"; if ( strSet.length() == 0 || !getSync().isContinueSync() ) return; getDB().executeSQL(strSqlUpdate, strObject); //Remove item if all nulls String strSelect = "SELECT * FROM " + getName() + " WHERE object=?"; IDBResult res = getDB().executeSQL( strSelect, strObject ); if ( !res.isOneEnd() ) { boolean bAllNulls = true; for( int i = 0; i < res.getColCount(); i ++) { if ( !res.isNullByIdx(i) && res.getColName(i).compareTo("object")!=0 ) { bAllNulls = false; break; } } if (bAllNulls) { String strDelete = "DELETE FROM " + getName() + " WHERE object=?"; getDB().executeSQL( strDelete, strObject); } } if ( getSyncType().compareTo("none") != 0 ) { getNotify().onObjectChanged(getID(), strObject, SyncNotify.enDelete); // oo conflicts for( int i = 0; i < (int)vecAttrs.size(); i++ ) { getDB().executeSQL("UPDATE changed_values SET sent=3 where object=? and attrib=? and source_id=?", strObject, vecAttrs.elementAt(i), getID() ); } // } m_nDeleted++; }else if ( strCmd.compareTo("links") == 0 ) { String strValue = attrIter.getCurString(); processAssociations(strObject, strValue); String strSqlUpdate = "UPDATE "; strSqlUpdate += getName() + " SET object=? WHERE object=?"; getDB().executeSQL(strSqlUpdate, strValue, strObject); getDB().executeSQL("UPDATE changed_values SET object=?,sent=3 where object=? and source_id=?", strValue, strObject, getID() ); getNotify().onObjectChanged(getID(), strObject, SyncNotify.enCreate); } } boolean processBlob( String strCmd, String strObject, CAttrValue oAttrValue )throws Exception { //TODO: when server return delete with rhoblob postfix - delete isBlobAttr if ( !(oAttrValue.m_strBlobSuffix.length() > 0 || getDB().getAttrMgr().isBlobAttr(getID(), oAttrValue.m_strAttrib)) ) return true; boolean bDownload = true; String strDbValue = ""; if ( !getDB().getAttrMgr().isOverwriteBlobFromServer(getID(), oAttrValue.m_strAttrib) ) { if ( m_bSchemaSource ) { String strSelect = "SELECT " + oAttrValue.m_strAttrib + " FROM " + getName() + " WHERE object=?"; IDBResult res = getDB().executeSQL( strSelect, strObject); if (!res.isOneEnd()) { strDbValue = res.getStringByIdx(0); bDownload = strDbValue == null || strDbValue.length() == 0; } }else { IDBResult res = getDB().executeSQL( "SELECT value FROM object_values WHERE object=? and attrib=? and source_id=?", strObject, oAttrValue.m_strAttrib, getID() ); if (!res.isOneEnd()) { strDbValue = res.getStringByIdx(0); bDownload = strDbValue == null || strDbValue.length() == 0; } } } if ( bDownload ) { boolean bRes = false; getDB().endTransaction(); try{ bRes = downloadBlob(oAttrValue); }finally { getDB().startTransaction(); } return bRes; } String fName = makeFileName( oAttrValue ); String fOldName = RhodesApp.getInstance().resolveDBFilesPath(strDbValue); RhoClassFactory.createFile().renameOverwrite(fOldName, fName); oAttrValue.m_strValue = FilePath.getRelativePath( fName, RhodesApp.getInstance().getRhoRootPath()); return true; } void processServerCmd_Ver3(String strCmd, String strObject, String strAttriba, String strValuea)throws Exception { CAttrValue oAttrValue = new CAttrValue(strAttriba,strValuea); if ( strCmd.compareTo("insert") == 0 ) { if ( !processBlob(strCmd,strObject,oAttrValue) ) return; IDBResult resInsert = getDB().executeSQLReportNonUnique("INSERT INTO object_values "+ "(attrib, source_id, object, value) VALUES(?,?,?,?)", oAttrValue.m_strAttrib, getID(), strObject, oAttrValue.m_strValue ); if ( resInsert.isNonUnique() ) { getDB().executeSQL("UPDATE object_values " + "SET value=? WHERE object=? and attrib=? and source_id=?", oAttrValue.m_strValue, strObject, oAttrValue.m_strAttrib, getID() ); if ( getSyncType().compareTo("none") != 0 ) { // oo conflicts getDB().executeSQL("UPDATE changed_values SET sent=4 where object=? and attrib=? and source_id=? and sent>1", strObject, oAttrValue.m_strAttrib, getID() ); // } } if ( getSyncType().compareTo("none") != 0 ) getNotify().onObjectChanged(getID(),strObject, SyncNotify.enUpdate); m_nInserted++; }else if (strCmd.compareTo("delete") == 0) { getDB().executeSQL("DELETE FROM object_values where object=? and attrib=? and source_id=?", strObject, oAttrValue.m_strAttrib, getID() ); if ( getSyncType().compareTo("none") != 0 ) { getNotify().onObjectChanged(getID(), strObject, SyncNotify.enDelete); // oo conflicts getDB().executeSQL("UPDATE changed_values SET sent=3 where object=? and attrib=? and source_id=?", strObject, oAttrValue.m_strAttrib, getID() ); // } m_nDeleted++; }else if ( strCmd.compareTo("links") == 0 ) { processAssociations(strObject, oAttrValue.m_strValue); getDB().executeSQL("UPDATE object_values SET object=? where object=? and source_id=?", oAttrValue.m_strValue, strObject, getID() ); getDB().executeSQL("UPDATE changed_values SET object=?,sent=3 where object=? and source_id=?", oAttrValue.m_strValue, strObject, getID() ); getNotify().onObjectChanged(getID(), strObject, SyncNotify.enCreate); } } private String makeFileName(CAttrValue value)throws Exception { String strExt = ""; URI uri = new URI(value.m_strValue); String strQuest = uri.getQueryString(); if (strQuest != null && strQuest.length() > 0) { int nExt = strQuest.indexOf("extension="); if ( nExt >= 0 ) { int nExtEnd = strQuest.indexOf("&", nExt); if (nExtEnd < 0 ) nExtEnd = strQuest.length(); strExt = strQuest.substring(nExt+10, nExtEnd); } } if ( strExt.length() == 0 ) { String strFileName = uri.getLastNamePart(); int nExt = strFileName != null ? strFileName.lastIndexOf('.') : -1; if ( nExt >= 0 ) strExt = strFileName.substring(nExt); } if ( strExt.length() == 0 ) strExt = ".bin"; else if ( strExt.charAt(0) != '.' ) strExt = "." + strExt; String fName = RhodesApp.getInstance().getBlobsDirPath() + "/id_" + TimeInterval.getCurrentTime().toULong() + strExt; return fName; /* String strExt = ".bin"; URI uri = new URI(value.m_strValue); int nDot = uri.getPath().lastIndexOf('.'); if ( nDot >= 0 ) strExt = uri.getPath().substring(nDot); else{ int nExt = uri.getQueryString().indexOf("extension="); if ( nExt >= 0 ){ int nExtEnd = uri.getQueryString().indexOf("&", nExt); if (nExtEnd < 0 ) nExtEnd = uri.getQueryString().length(); strExt = uri.getQueryString().substring(nExt+10, nExtEnd); } } String fName = RhodesApp.getInstance().getBlobsDirPath() + "/id_" + TimeInterval.getCurrentTime().toULong() + strExt; return fName;*/ } boolean downloadBlob(CAttrValue value)throws Exception { String fName = makeFileName( value ); String url = value.m_strValue; int nQuest = url.lastIndexOf('?'); if ( nQuest > 0 ) url += "&"; else url += "?"; url += "client_id=" + getSync().getClientID(); try{ NetResponse resp = getNet().pullFile(url, fName, getSync(), null); if ( !resp.isOK() ) { getSync().stopSync(); m_nErrCode = RhoAppAdapter.getErrorFromResponse(resp); return false; } }catch(Exception exc) { m_nErrCode = RhoAppAdapter.getNetErrorCode(exc); throw exc; } value.m_strValue = FilePath.getRelativePath( fName, RhodesApp.getInstance().getRhoRootPath()); return true; } void processToken(long token)throws DBException { if ( token > 1 && getToken() == token ){ //Delete non-confirmed records setToken( token ); //For m_bTokenFromDB = false; //getDB().executeSQL("DELETE FROM object_values where source_id=? and token=?", getID(), token ); //TODO: add special table for id,token }else { setToken( token ); getDB().executeSQL("UPDATE sources SET token=? where source_id=?", new Long(token), getID() ); } } }