platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-1.2.2 vs platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-1.4.0

- old
+ new

@@ -17,11 +17,14 @@ new RhoLogger("Sync"); static class SyncNotification { String m_strUrl, m_strParams; - SyncNotification(String strUrl, String strParams){ m_strUrl = strUrl; m_strParams = strParams; } + boolean m_bRemoveAfterFire; + + SyncNotification(String strUrl, String strParams, boolean bRemoveAfterFire) + { m_strUrl = strUrl; m_strParams = strParams; m_bRemoveAfterFire = bRemoveAfterFire; } }; public static final Integer enNone = new Integer(0), enDelete=new Integer(1), enUpdate=new Integer(2), enCreate=new Integer(3); SyncEngine m_syncEngine; @@ -33,10 +36,14 @@ Hashtable/*<int,int>*/ m_hashSrcObjectCount = new Hashtable(); static Mutex m_mxObjectNotify = new Mutex(); Hashtable/*<int,SyncNotification>*/ m_mapSyncNotifications = new Hashtable(); + Hashtable/*<int,SyncNotification>*/ m_mapSearchNotifications = new Hashtable(); + + SyncNotification m_initialSyncNotify; + Mutex m_mxSyncNotifications = new Mutex(); ISyncStatusListener m_syncStatusListener = null; SyncEngine getSync(){ return m_syncEngine; } DBAdapter getDB(){ return getSync().getDB(); } @@ -140,11 +147,11 @@ if ( !res.isEnd() ) nNotifyType = enUpdate.intValue(); } if ( strBody.length() > 0 ) - strBody += '&'; + strBody += "&rho_callback=1&"; if (nNotifyType == enDelete.intValue() ) { strBody += "deleted[][object]=" + strObject; strBody += "&deleted[][source_id]=" + nSrcID; @@ -164,11 +171,11 @@ if ( strBody.length() == 0 ) return; } - NetResponse resp = getNet().pushData( strUrl, strBody, getSync() ); + NetResponse resp = getNet().pushData( strUrl, strBody, null ); if ( !resp.isOK() ) LOG.ERROR( "Fire object notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); } @@ -267,59 +274,159 @@ if ( strFullUrl.length() > 0 ) { IDBResult res = getDB().executeSQL("SELECT source_id from sources order by source_id"); for ( ; !res.isEnd(); res.next() ) - m_mapSyncNotifications.put( new Integer(res.getIntByIdx(0)), new SyncNotification( strFullUrl, strParams ) ); + m_mapSyncNotifications.put( new Integer(res.getIntByIdx(0)), new SyncNotification( strFullUrl, strParams, false ) ); } } LOG.INFO( " Done Set notification for all sources; Url :" + strFullUrl + "; Params: " + strParams ); }else { //clearSyncNotification(source_id); if ( strFullUrl.length() > 0 ) { synchronized(m_mxSyncNotifications){ - m_mapSyncNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams ) ); + m_mapSyncNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams, true ) ); } LOG.INFO( " Done Set notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams ); } } } - public void setSyncStatusListener(ISyncStatusListener listener) { m_syncStatusListener = listener; } - private void reportSyncStatus(String status, int error, String strDetails) { - if (m_syncStatusListener != null) { - if ( strDetails.length() == 0 ) - strDetails = RhoRuby.getErrorText(error); - status += (strDetails.length() > 0 ? " Details: " + strDetails: ""); - - LOG.INFO("Status: "+status); - - m_syncStatusListener.reportStatus( status, error); + void setSearchNotification(int source_id, String strUrl, String strParams )throws Exception + { + LOG.INFO( "Set search notification. Source ID: " + source_id + "; Url :" + strUrl + "; Params: " + strParams ); + String strFullUrl = getNet().resolveUrl(strUrl); + + if ( strFullUrl.length() > 0 ) + { + synchronized(m_mxSyncNotifications){ + m_mapSearchNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams, true ) ); + } + LOG.INFO( " Done Set search notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams ); + } + } + + void setInitialSyncNotification(String strUrl, String strParams )throws Exception + { + String strFullUrl = getNet().resolveUrl(strUrl); + + m_initialSyncNotify = new SyncNotification( strFullUrl, strParams, true ); + } + + public void setSyncStatusListener(ISyncStatusListener listener) + { + synchronized(m_mxSyncNotifications){ + m_syncStatusListener = listener; } } + + private void reportSyncStatus(String status, int error, String strDetails) + { + synchronized(m_mxSyncNotifications) + { + if (m_syncStatusListener != null) { + if ( strDetails.length() == 0 ) + strDetails = RhoRuby.getErrorText(error); + status += (strDetails.length() > 0 ? RhoRuby.getMessageText("details") + strDetails: ""); + + LOG.INFO("Status: "+status); + + m_syncStatusListener.reportStatus( status, error); + } + } + } void fireAllSyncNotifications( boolean bFinish, int nErrCode, String strMessage, Vector/*Ptr<CSyncSource*>&*/ sources ) { for( int i = 0; i < sources.size(); i++ ) { doFireSyncNotification( (SyncSource)sources.elementAt(i), bFinish, nErrCode, strMessage ); } } + + void fireInitialSyncNotification( boolean bFinish, int nErrCode ) + { + if ( getSync().getState() == SyncEngine.esExit ) + return; + + //TODO: show report + if( nErrCode != RhoRuby.ERR_NONE) + { + String strMessage = RhoRuby.getMessageText("sync_failed_for") + "initial."; + reportSyncStatus(strMessage,nErrCode,""); + } + + try{ + boolean bRemoveAfterFire = bFinish; + String strBody = "", strUrl; + synchronized(m_mxSyncNotifications) + { + if ( m_initialSyncNotify == null ) + return; + + strUrl = m_initialSyncNotify.m_strUrl; + strBody = "rho_callback=1"; + strBody += "&status="; + if ( bFinish ) + { + if ( nErrCode == RhoRuby.ERR_NONE ) + strBody += "ok"; + else + { + if ( getSync().isStoppedByUser() ) + nErrCode = RhoRuby.ERR_CANCELBYUSER; + + strBody += "error"; + strBody += "&error_code=" + nErrCode; + } + } + else + strBody += "in_progress"; + + if ( m_initialSyncNotify.m_strParams.length() > 0 ) + strBody += "&" + m_initialSyncNotify.m_strParams; + + bRemoveAfterFire = bRemoveAfterFire && m_initialSyncNotify.m_bRemoveAfterFire; + } + + if ( bRemoveAfterFire ) + clearInitialSyncNotification(); + + LOG.INFO( "Fire initial notification.Url :" + strUrl + "; Body: " + strBody ); + + NetResponse resp = getNet().pushData( strUrl, strBody, null ); + if ( !resp.isOK() ) + LOG.ERROR( "Fire intial notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); + else + { + String szData = resp.getCharData(); + if ( szData != null && szData.equals("stop") ) + { + clearInitialSyncNotification(); + } + } + }catch(Exception exc) + { + LOG.ERROR("Fire initial notification failed.", exc); + } + + } + void fireSyncNotification( SyncSource src, boolean bFinish, int nErrCode, String strMessage ) { if ( getSync().getState() == SyncEngine.esExit ) return; if( strMessage.length() > 0 || nErrCode != RhoRuby.ERR_NONE) { if ( !( src != null && src.m_strParams.length()>0) ) { if ( src != null && (strMessage==null || strMessage.length() == 0) ) - strMessage = "Sync failed for " + src.getName() + "."; + strMessage = RhoRuby.getMessageText("sync_failed_for") + src.getName() + "."; reportSyncStatus(strMessage,nErrCode,src!= null?src.m_strError:""); } } @@ -331,23 +438,25 @@ if ( src == null || getSync().isStoppedByUser() ) return; //TODO: implement all sources callback try{ String strBody = "", strUrl; + boolean bRemoveAfterFire = bFinish; { synchronized(m_mxSyncNotifications){ - SyncNotification sn = (SyncNotification)m_mapSyncNotifications.get(src.getID()); + SyncNotification sn = (SyncNotification)(src.isSearch() ? m_mapSearchNotifications.get(src.getID()) : m_mapSyncNotifications.get(src.getID())); if ( sn == null ) return; strUrl = sn.m_strUrl; strBody += "total_count=" + src.getTotalCount(); strBody += "&processed_count=" + src.getCurPageCount(); strBody += "&processed_objects_count=" + getLastSyncObjectCount(src.getID()); strBody += "&cumulative_count=" + src.getServerObjectsCount(); strBody += "&source_id=" + src.getID(); strBody += "&source_name=" + src.getName(); + strBody += "&rho_callback=1"; strBody += "&status="; if ( bFinish ) { if ( nErrCode == RhoRuby.ERR_NONE ) @@ -365,43 +474,70 @@ else strBody += "in_progress"; if ( sn.m_strParams.length() > 0 ) strBody += "&" + sn.m_strParams; + + bRemoveAfterFire = bRemoveAfterFire && sn.m_bRemoveAfterFire; } } - if ( bFinish ) - clearSyncNotification(src.getID().intValue()); + if ( bRemoveAfterFire ) + clearNotification(src); LOG.INFO( "Fire notification. Source ID: " + src.getID() + "; Url :" + strUrl + "; Body: " + strBody ); - NetResponse resp = getNet().pushData( strUrl, strBody, getSync() ); + NetResponse resp = getNet().pushData( strUrl, strBody, null ); if ( !resp.isOK() ) LOG.ERROR( "Fire notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); else { String szData = resp.getCharData(); if ( szData != null && szData.equals("stop") ) { - clearSyncNotification(src.getID().intValue()); + clearNotification(src); } } }catch(Exception exc) { LOG.ERROR("Fire notification failed.", exc); } } + void clearNotification(SyncSource src) + { + LOG.INFO( "Clear notification. Source : " + src.getName()); + + synchronized(m_mxSyncNotifications) + { + if ( src.isSearch() ) + m_mapSearchNotifications.remove(src.getID()); + else + m_mapSyncNotifications.remove(src.getID()); + } + } + void clearSyncNotification(int source_id) { LOG.INFO( "Clear notification. Source ID: " + source_id ); synchronized(m_mxSyncNotifications){ - m_mapSyncNotifications.remove(new Integer(source_id)); + if ( source_id == -1 )//Clear all + m_mapSyncNotifications.clear(); + else + m_mapSyncNotifications.remove(new Integer(source_id)); } } + void clearInitialSyncNotification() + { + LOG.INFO( "Clear initial notification." ); + + synchronized(m_mxSyncNotifications){ + m_initialSyncNotify = null; + } + } + void cleanLastSyncObjectCount() { synchronized(m_mxSyncNotifications) { m_hashSrcObjectCount.clear();