platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-1.5.5 vs platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-2.0.0.beta1

- old
+ new

@@ -36,20 +36,18 @@ Hashtable/*<int,int>*/ m_hashSrcObjectCount = new Hashtable(); static Mutex m_mxObjectNotify = new Mutex(); Hashtable/*<int,SyncNotification>*/ m_mapSyncNotifications = new Hashtable(); - Hashtable/*<int,SyncNotification>*/ m_mapSearchNotifications = new Hashtable(); + SyncNotification m_pSearchNotification; + SyncNotification m_bulkSyncNotify; - SyncNotification m_initialSyncNotify; - Mutex m_mxSyncNotifications = new Mutex(); ISyncStatusListener m_syncStatusListener = null; boolean m_bEnableReporting = false; SyncEngine getSync(){ return m_syncEngine; } - DBAdapter getDB(){ return getSync().getDB(); } NetRequest getNet(){ return getSync().getNet(); } SyncNotify( SyncEngine syncEngine ) { m_syncEngine = syncEngine; @@ -140,17 +138,19 @@ String strObject = (String)keysObject.nextElement(); if (nNotifyType == enNone.intValue()) continue; + //This is slow operation +/* if ( nNotifyType == enDelete.intValue() ) { IDBResult res = getDB().executeSQL("SELECT object FROM object_values where object=? LIMIT 1 OFFSET 0", strObject ); if ( !res.isEnd() ) nNotifyType = enUpdate.intValue(); } - +*/ if ( strBody.length() > 0 ) strBody += "&rho_callback=1&"; if (nNotifyType == enDelete.intValue() ) { @@ -172,14 +172,11 @@ if ( strBody.length() == 0 ) return; } - NetResponse resp = getNet().pushData( strUrl, strBody, null ); - if ( !resp.isOK() ) - LOG.ERROR( "Fire object notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); - + callNotify(strUrl, strBody); } void onObjectChanged(Integer nSrcID, String strObject, Integer nType) { synchronized(m_mxObjectNotify) @@ -273,11 +270,11 @@ synchronized(m_mxSyncNotifications){ m_mapSyncNotifications.clear(); if ( strFullUrl.length() > 0 ) { - IDBResult res = getDB().executeSQL("SELECT source_id from sources order by source_id"); + IDBResult res = DBAdapter.getUserDB().executeSQL("SELECT source_id from sources order by source_id"); for ( ; !res.isEnd(); res.next() ) m_mapSyncNotifications.put( new Integer(res.getIntByIdx(0)), new SyncNotification( strFullUrl, strParams, false ) ); } } LOG.INFO( " Done Set notification for all sources; Url :" + strFullUrl + "; Params: " + strParams ); @@ -292,29 +289,29 @@ LOG.INFO( " Done Set notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams ); } } } - void setSearchNotification(int source_id, String strUrl, String strParams )throws Exception + void setSearchNotification(String strUrl, String strParams )throws Exception { - LOG.INFO( "Set search notification. Source ID: " + source_id + "; Url :" + strUrl + "; Params: " + strParams ); + LOG.INFO( "Set search notification. Url :" + strUrl + "; Params: " + strParams ); String strFullUrl = getNet().resolveUrl(strUrl); if ( strFullUrl.length() > 0 ) { synchronized(m_mxSyncNotifications){ - m_mapSearchNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams, true ) ); + m_pSearchNotification = new SyncNotification( strFullUrl, strParams, true ); } - LOG.INFO( " Done Set search notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams ); + LOG.INFO( " Done Set search notification. Url :" + strFullUrl + "; Params: " + strParams ); } } - void setInitialSyncNotification(String strUrl, String strParams )throws Exception + void setBulkSyncNotification(String strUrl, String strParams )throws Exception { String strFullUrl = getNet().resolveUrl(strUrl); - m_initialSyncNotify = new SyncNotification( strFullUrl, strParams, true ); + m_bulkSyncNotify = new SyncNotification( strFullUrl, strParams, false ); } public void setSyncStatusListener(ISyncStatusListener listener) { synchronized(m_mxSyncNotifications){ @@ -355,32 +352,34 @@ { doFireSyncNotification( (SyncSource)sources.elementAt(i), bFinish, nErrCode, strMessage ); } } - void fireInitialSyncNotification( boolean bFinish, int nErrCode ) + void fireBulkSyncNotification( boolean bFinish, String status, String partition, int nErrCode ) { if ( getSync().getState() == SyncEngine.esExit ) return; //TODO: show report if( nErrCode != RhoRuby.ERR_NONE) { - String strMessage = RhoRuby.getMessageText("sync_failed_for") + "initial."; + String strMessage = RhoRuby.getMessageText("sync_failed_for") + "bulk."; reportSyncStatus(strMessage,nErrCode,""); } try{ boolean bRemoveAfterFire = bFinish; String strBody = "", strUrl; synchronized(m_mxSyncNotifications) { - if ( m_initialSyncNotify == null ) + if ( m_bulkSyncNotify == null ) return; - strUrl = m_initialSyncNotify.m_strUrl; + strUrl = m_bulkSyncNotify.m_strUrl; strBody = "rho_callback=1"; + strBody += "&partition=" + partition; + strBody += "&bulk_status="+status; strBody += "&status="; if ( bFinish ) { if ( nErrCode == RhoRuby.ERR_NONE ) strBody += "ok"; @@ -394,36 +393,26 @@ } } else strBody += "in_progress"; - if ( m_initialSyncNotify.m_strParams.length() > 0 ) - strBody += "&" + m_initialSyncNotify.m_strParams; + if ( m_bulkSyncNotify.m_strParams.length() > 0 ) + strBody += "&" + m_bulkSyncNotify.m_strParams; - bRemoveAfterFire = bRemoveAfterFire && m_initialSyncNotify.m_bRemoveAfterFire; + bRemoveAfterFire = bRemoveAfterFire && m_bulkSyncNotify.m_bRemoveAfterFire; } if ( bRemoveAfterFire ) - clearInitialSyncNotification(); + clearBulkSyncNotification(); - LOG.INFO( "Fire initial notification.Url :" + strUrl + "; Body: " + strBody ); - - NetResponse resp = getNet().pushData( strUrl, strBody, null ); - if ( !resp.isOK() ) - LOG.ERROR( "Fire intial notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); - else - { - String szData = resp.getCharData(); - if ( szData != null && szData.equals("stop") ) - { - clearInitialSyncNotification(); - } - } - + LOG.INFO( "Fire bulk notification.Url :" + strUrl + "; Body: " + strBody ); + + if ( callNotify(strUrl, strBody) ) + clearBulkSyncNotification(); }catch(Exception exc) { - LOG.ERROR("Fire initial notification failed.", exc); + LOG.ERROR("Fire bulk notification failed.", exc); } } void fireSyncNotification( SyncSource src, boolean bFinish, int nErrCode, String strMessage ) @@ -431,11 +420,11 @@ if ( getSync().getState() == SyncEngine.esExit ) return; if( strMessage.length() > 0 || nErrCode != RhoRuby.ERR_NONE) { - if ( !( src != null && src.m_strParams.length()>0) ) + if ( !( src != null && src.isSearch()) ) { if ( src != null && (strMessage==null || strMessage.length() == 0) ) strMessage = RhoRuby.getMessageText("sync_failed_for") + src.getName() + "."; reportSyncStatus(strMessage,nErrCode,src!= null?src.m_strError:""); @@ -453,11 +442,16 @@ try{ String strBody = "", strUrl; boolean bRemoveAfterFire = bFinish; { synchronized(m_mxSyncNotifications){ - SyncNotification sn = (SyncNotification)(src.isSearch() ? m_mapSearchNotifications.get(src.getID()) : m_mapSyncNotifications.get(src.getID())); + SyncNotification sn = null; + if ( src.isSearch() ) + sn = m_pSearchNotification; + else + sn = (SyncNotification)m_mapSyncNotifications.get(src.getID()); + if ( sn == null ) return; strUrl = sn.m_strUrl; strBody += "total_count=" + src.getTotalCount(); @@ -494,37 +488,55 @@ } if ( bRemoveAfterFire ) clearNotification(src); LOG.INFO( "Fire notification. Source ID: " + src.getID() + "; Url :" + strUrl + "; Body: " + strBody ); - - NetResponse resp = getNet().pushData( strUrl, strBody, null ); - if ( !resp.isOK() ) - LOG.ERROR( "Fire notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); - else - { - String szData = resp.getCharData(); - if ( szData != null && szData.equals("stop") ) - { - clearNotification(src); - } - } - + + if ( callNotify(strUrl, strBody) ) + clearNotification(src); }catch(Exception exc) { LOG.ERROR("Fire notification failed.", exc); } } + boolean callNotify(String strUrl, String strBody )throws Exception + { +/* if ( getSync().isNoThreadedMode() ) + { + const char* szName = strrchr(strUrl.c_str(), '/'); + if (!szName) + szName = strUrl.c_str(); + else + szName++; + + String strName = "C_"; + strName += szName; + rho_ruby_set_const( strName.c_str(), strBody.c_str()); + return false; + }*/ + + NetResponse resp = getNet().pushData( strUrl, strBody, null ); + if ( !resp.isOK() ) + LOG.ERROR( "Fire object notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() ); + else + { + String szData = resp.getCharData(); + return szData != null && szData.equals("stop"); + } + + return false; + } + void clearNotification(SyncSource src) { LOG.INFO( "Clear notification. Source : " + src.getName()); synchronized(m_mxSyncNotifications) { if ( src.isSearch() ) - m_mapSearchNotifications.remove(src.getID()); + m_pSearchNotification = null; else m_mapSyncNotifications.remove(src.getID()); } } @@ -538,16 +550,16 @@ else m_mapSyncNotifications.remove(new Integer(source_id)); } } - void clearInitialSyncNotification() + void clearBulkSyncNotification() { - LOG.INFO( "Clear initial notification." ); + LOG.INFO( "Clear bulk notification." ); synchronized(m_mxSyncNotifications){ - m_initialSyncNotify = null; + m_bulkSyncNotify = null; } } void cleanLastSyncObjectCount() { @@ -584,7 +596,26 @@ nCount = nCountVal.intValue(); } return nCount; } + + void callLoginCallback(String callback, int nErrCode, String strMessage) + { + try{ + String strBody = "error_code=" + nErrCode; + strBody += "&error_message=" + URI.urlEncode(strMessage != null? strMessage : ""); + strBody += "&rho_callback=1"; + + String strUrl = getNet().resolveUrl(callback); + + LOG.INFO( "Login callback: " + callback + ". Body: "+ strBody ); + + callNotify(strUrl, strBody); + }catch(Exception exc) + { + LOG.ERROR("Call Login callback failed.", exc); + } + } + }