platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-1.2.2 vs platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-1.4.0
- old
+ new
@@ -17,11 +17,14 @@
new RhoLogger("Sync");
static class SyncNotification
{
String m_strUrl, m_strParams;
- SyncNotification(String strUrl, String strParams){ m_strUrl = strUrl; m_strParams = strParams; }
+ boolean m_bRemoveAfterFire;
+
+ SyncNotification(String strUrl, String strParams, boolean bRemoveAfterFire)
+ { m_strUrl = strUrl; m_strParams = strParams; m_bRemoveAfterFire = bRemoveAfterFire; }
};
public static final Integer enNone = new Integer(0), enDelete=new Integer(1), enUpdate=new Integer(2), enCreate=new Integer(3);
SyncEngine m_syncEngine;
@@ -33,10 +36,14 @@
Hashtable/*<int,int>*/ m_hashSrcObjectCount = new Hashtable();
static Mutex m_mxObjectNotify = new Mutex();
Hashtable/*<int,SyncNotification>*/ m_mapSyncNotifications = new Hashtable();
+ Hashtable/*<int,SyncNotification>*/ m_mapSearchNotifications = new Hashtable();
+
+ SyncNotification m_initialSyncNotify;
+
Mutex m_mxSyncNotifications = new Mutex();
ISyncStatusListener m_syncStatusListener = null;
SyncEngine getSync(){ return m_syncEngine; }
DBAdapter getDB(){ return getSync().getDB(); }
@@ -140,11 +147,11 @@
if ( !res.isEnd() )
nNotifyType = enUpdate.intValue();
}
if ( strBody.length() > 0 )
- strBody += '&';
+ strBody += "&rho_callback=1&";
if (nNotifyType == enDelete.intValue() )
{
strBody += "deleted[][object]=" + strObject;
strBody += "&deleted[][source_id]=" + nSrcID;
@@ -164,11 +171,11 @@
if ( strBody.length() == 0 )
return;
}
- NetResponse resp = getNet().pushData( strUrl, strBody, getSync() );
+ NetResponse resp = getNet().pushData( strUrl, strBody, null );
if ( !resp.isOK() )
LOG.ERROR( "Fire object notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
}
@@ -267,59 +274,159 @@
if ( strFullUrl.length() > 0 )
{
IDBResult res = getDB().executeSQL("SELECT source_id from sources order by source_id");
for ( ; !res.isEnd(); res.next() )
- m_mapSyncNotifications.put( new Integer(res.getIntByIdx(0)), new SyncNotification( strFullUrl, strParams ) );
+ m_mapSyncNotifications.put( new Integer(res.getIntByIdx(0)), new SyncNotification( strFullUrl, strParams, false ) );
}
}
LOG.INFO( " Done Set notification for all sources; Url :" + strFullUrl + "; Params: " + strParams );
}else
{
//clearSyncNotification(source_id);
if ( strFullUrl.length() > 0 )
{
synchronized(m_mxSyncNotifications){
- m_mapSyncNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams ) );
+ m_mapSyncNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams, true ) );
}
LOG.INFO( " Done Set notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams );
}
}
}
- public void setSyncStatusListener(ISyncStatusListener listener) { m_syncStatusListener = listener; }
- private void reportSyncStatus(String status, int error, String strDetails) {
- if (m_syncStatusListener != null) {
- if ( strDetails.length() == 0 )
- strDetails = RhoRuby.getErrorText(error);
- status += (strDetails.length() > 0 ? " Details: " + strDetails: "");
-
- LOG.INFO("Status: "+status);
-
- m_syncStatusListener.reportStatus( status, error);
+ void setSearchNotification(int source_id, String strUrl, String strParams )throws Exception
+ {
+ LOG.INFO( "Set search notification. Source ID: " + source_id + "; Url :" + strUrl + "; Params: " + strParams );
+ String strFullUrl = getNet().resolveUrl(strUrl);
+
+ if ( strFullUrl.length() > 0 )
+ {
+ synchronized(m_mxSyncNotifications){
+ m_mapSearchNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams, true ) );
+ }
+ LOG.INFO( " Done Set search notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams );
+ }
+ }
+
+ void setInitialSyncNotification(String strUrl, String strParams )throws Exception
+ {
+ String strFullUrl = getNet().resolveUrl(strUrl);
+
+ m_initialSyncNotify = new SyncNotification( strFullUrl, strParams, true );
+ }
+
+ public void setSyncStatusListener(ISyncStatusListener listener)
+ {
+ synchronized(m_mxSyncNotifications){
+ m_syncStatusListener = listener;
}
}
+
+ private void reportSyncStatus(String status, int error, String strDetails)
+ {
+ synchronized(m_mxSyncNotifications)
+ {
+ if (m_syncStatusListener != null) {
+ if ( strDetails.length() == 0 )
+ strDetails = RhoRuby.getErrorText(error);
+ status += (strDetails.length() > 0 ? RhoRuby.getMessageText("details") + strDetails: "");
+
+ LOG.INFO("Status: "+status);
+
+ m_syncStatusListener.reportStatus( status, error);
+ }
+ }
+ }
void fireAllSyncNotifications( boolean bFinish, int nErrCode, String strMessage, Vector/*Ptr<CSyncSource*>&*/ sources )
{
for( int i = 0; i < sources.size(); i++ )
{
doFireSyncNotification( (SyncSource)sources.elementAt(i), bFinish, nErrCode, strMessage );
}
}
+
+ void fireInitialSyncNotification( boolean bFinish, int nErrCode )
+ {
+ if ( getSync().getState() == SyncEngine.esExit )
+ return;
+
+ //TODO: show report
+ if( nErrCode != RhoRuby.ERR_NONE)
+ {
+ String strMessage = RhoRuby.getMessageText("sync_failed_for") + "initial.";
+ reportSyncStatus(strMessage,nErrCode,"");
+ }
+
+ try{
+ boolean bRemoveAfterFire = bFinish;
+ String strBody = "", strUrl;
+ synchronized(m_mxSyncNotifications)
+ {
+ if ( m_initialSyncNotify == null )
+ return;
+
+ strUrl = m_initialSyncNotify.m_strUrl;
+ strBody = "rho_callback=1";
+ strBody += "&status=";
+ if ( bFinish )
+ {
+ if ( nErrCode == RhoRuby.ERR_NONE )
+ strBody += "ok";
+ else
+ {
+ if ( getSync().isStoppedByUser() )
+ nErrCode = RhoRuby.ERR_CANCELBYUSER;
+
+ strBody += "error";
+ strBody += "&error_code=" + nErrCode;
+ }
+ }
+ else
+ strBody += "in_progress";
+
+ if ( m_initialSyncNotify.m_strParams.length() > 0 )
+ strBody += "&" + m_initialSyncNotify.m_strParams;
+
+ bRemoveAfterFire = bRemoveAfterFire && m_initialSyncNotify.m_bRemoveAfterFire;
+ }
+
+ if ( bRemoveAfterFire )
+ clearInitialSyncNotification();
+
+ LOG.INFO( "Fire initial notification.Url :" + strUrl + "; Body: " + strBody );
+
+ NetResponse resp = getNet().pushData( strUrl, strBody, null );
+ if ( !resp.isOK() )
+ LOG.ERROR( "Fire intial notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
+ else
+ {
+ String szData = resp.getCharData();
+ if ( szData != null && szData.equals("stop") )
+ {
+ clearInitialSyncNotification();
+ }
+ }
+ }catch(Exception exc)
+ {
+ LOG.ERROR("Fire initial notification failed.", exc);
+ }
+
+ }
+
void fireSyncNotification( SyncSource src, boolean bFinish, int nErrCode, String strMessage )
{
if ( getSync().getState() == SyncEngine.esExit )
return;
if( strMessage.length() > 0 || nErrCode != RhoRuby.ERR_NONE)
{
if ( !( src != null && src.m_strParams.length()>0) )
{
if ( src != null && (strMessage==null || strMessage.length() == 0) )
- strMessage = "Sync failed for " + src.getName() + ".";
+ strMessage = RhoRuby.getMessageText("sync_failed_for") + src.getName() + ".";
reportSyncStatus(strMessage,nErrCode,src!= null?src.m_strError:"");
}
}
@@ -331,23 +438,25 @@
if ( src == null || getSync().isStoppedByUser() )
return; //TODO: implement all sources callback
try{
String strBody = "", strUrl;
+ boolean bRemoveAfterFire = bFinish;
{
synchronized(m_mxSyncNotifications){
- SyncNotification sn = (SyncNotification)m_mapSyncNotifications.get(src.getID());
+ SyncNotification sn = (SyncNotification)(src.isSearch() ? m_mapSearchNotifications.get(src.getID()) : m_mapSyncNotifications.get(src.getID()));
if ( sn == null )
return;
strUrl = sn.m_strUrl;
strBody += "total_count=" + src.getTotalCount();
strBody += "&processed_count=" + src.getCurPageCount();
strBody += "&processed_objects_count=" + getLastSyncObjectCount(src.getID());
strBody += "&cumulative_count=" + src.getServerObjectsCount();
strBody += "&source_id=" + src.getID();
strBody += "&source_name=" + src.getName();
+ strBody += "&rho_callback=1";
strBody += "&status=";
if ( bFinish )
{
if ( nErrCode == RhoRuby.ERR_NONE )
@@ -365,43 +474,70 @@
else
strBody += "in_progress";
if ( sn.m_strParams.length() > 0 )
strBody += "&" + sn.m_strParams;
+
+ bRemoveAfterFire = bRemoveAfterFire && sn.m_bRemoveAfterFire;
}
}
- if ( bFinish )
- clearSyncNotification(src.getID().intValue());
+ if ( bRemoveAfterFire )
+ clearNotification(src);
LOG.INFO( "Fire notification. Source ID: " + src.getID() + "; Url :" + strUrl + "; Body: " + strBody );
- NetResponse resp = getNet().pushData( strUrl, strBody, getSync() );
+ NetResponse resp = getNet().pushData( strUrl, strBody, null );
if ( !resp.isOK() )
LOG.ERROR( "Fire notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
else
{
String szData = resp.getCharData();
if ( szData != null && szData.equals("stop") )
{
- clearSyncNotification(src.getID().intValue());
+ clearNotification(src);
}
}
}catch(Exception exc)
{
LOG.ERROR("Fire notification failed.", exc);
}
}
+ void clearNotification(SyncSource src)
+ {
+ LOG.INFO( "Clear notification. Source : " + src.getName());
+
+ synchronized(m_mxSyncNotifications)
+ {
+ if ( src.isSearch() )
+ m_mapSearchNotifications.remove(src.getID());
+ else
+ m_mapSyncNotifications.remove(src.getID());
+ }
+ }
+
void clearSyncNotification(int source_id)
{
LOG.INFO( "Clear notification. Source ID: " + source_id );
synchronized(m_mxSyncNotifications){
- m_mapSyncNotifications.remove(new Integer(source_id));
+ if ( source_id == -1 )//Clear all
+ m_mapSyncNotifications.clear();
+ else
+ m_mapSyncNotifications.remove(new Integer(source_id));
}
}
+ void clearInitialSyncNotification()
+ {
+ LOG.INFO( "Clear initial notification." );
+
+ synchronized(m_mxSyncNotifications){
+ m_initialSyncNotify = null;
+ }
+ }
+
void cleanLastSyncObjectCount()
{
synchronized(m_mxSyncNotifications)
{
m_hashSrcObjectCount.clear();