platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-1.5.5 vs platform/shared/rubyJVM/src/com/rho/sync/SyncNotify.java in rhodes-2.0.0.beta1
- old
+ new
@@ -36,20 +36,18 @@
Hashtable/*<int,int>*/ m_hashSrcObjectCount = new Hashtable();
static Mutex m_mxObjectNotify = new Mutex();
Hashtable/*<int,SyncNotification>*/ m_mapSyncNotifications = new Hashtable();
- Hashtable/*<int,SyncNotification>*/ m_mapSearchNotifications = new Hashtable();
+ SyncNotification m_pSearchNotification;
+ SyncNotification m_bulkSyncNotify;
- SyncNotification m_initialSyncNotify;
-
Mutex m_mxSyncNotifications = new Mutex();
ISyncStatusListener m_syncStatusListener = null;
boolean m_bEnableReporting = false;
SyncEngine getSync(){ return m_syncEngine; }
- DBAdapter getDB(){ return getSync().getDB(); }
NetRequest getNet(){ return getSync().getNet(); }
SyncNotify( SyncEngine syncEngine )
{
m_syncEngine = syncEngine;
@@ -140,17 +138,19 @@
String strObject = (String)keysObject.nextElement();
if (nNotifyType == enNone.intValue())
continue;
+ //This is slow operation
+/*
if ( nNotifyType == enDelete.intValue() )
{
IDBResult res = getDB().executeSQL("SELECT object FROM object_values where object=? LIMIT 1 OFFSET 0", strObject );
if ( !res.isEnd() )
nNotifyType = enUpdate.intValue();
}
-
+*/
if ( strBody.length() > 0 )
strBody += "&rho_callback=1&";
if (nNotifyType == enDelete.intValue() )
{
@@ -172,14 +172,11 @@
if ( strBody.length() == 0 )
return;
}
- NetResponse resp = getNet().pushData( strUrl, strBody, null );
- if ( !resp.isOK() )
- LOG.ERROR( "Fire object notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
-
+ callNotify(strUrl, strBody);
}
void onObjectChanged(Integer nSrcID, String strObject, Integer nType)
{
synchronized(m_mxObjectNotify)
@@ -273,11 +270,11 @@
synchronized(m_mxSyncNotifications){
m_mapSyncNotifications.clear();
if ( strFullUrl.length() > 0 )
{
- IDBResult res = getDB().executeSQL("SELECT source_id from sources order by source_id");
+ IDBResult res = DBAdapter.getUserDB().executeSQL("SELECT source_id from sources order by source_id");
for ( ; !res.isEnd(); res.next() )
m_mapSyncNotifications.put( new Integer(res.getIntByIdx(0)), new SyncNotification( strFullUrl, strParams, false ) );
}
}
LOG.INFO( " Done Set notification for all sources; Url :" + strFullUrl + "; Params: " + strParams );
@@ -292,29 +289,29 @@
LOG.INFO( " Done Set notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams );
}
}
}
- void setSearchNotification(int source_id, String strUrl, String strParams )throws Exception
+ void setSearchNotification(String strUrl, String strParams )throws Exception
{
- LOG.INFO( "Set search notification. Source ID: " + source_id + "; Url :" + strUrl + "; Params: " + strParams );
+ LOG.INFO( "Set search notification. Url :" + strUrl + "; Params: " + strParams );
String strFullUrl = getNet().resolveUrl(strUrl);
if ( strFullUrl.length() > 0 )
{
synchronized(m_mxSyncNotifications){
- m_mapSearchNotifications.put(new Integer(source_id),new SyncNotification( strFullUrl, strParams, true ) );
+ m_pSearchNotification = new SyncNotification( strFullUrl, strParams, true );
}
- LOG.INFO( " Done Set search notification. Source ID: " + source_id + "; Url :" + strFullUrl + "; Params: " + strParams );
+ LOG.INFO( " Done Set search notification. Url :" + strFullUrl + "; Params: " + strParams );
}
}
- void setInitialSyncNotification(String strUrl, String strParams )throws Exception
+ void setBulkSyncNotification(String strUrl, String strParams )throws Exception
{
String strFullUrl = getNet().resolveUrl(strUrl);
- m_initialSyncNotify = new SyncNotification( strFullUrl, strParams, true );
+ m_bulkSyncNotify = new SyncNotification( strFullUrl, strParams, false );
}
public void setSyncStatusListener(ISyncStatusListener listener)
{
synchronized(m_mxSyncNotifications){
@@ -355,32 +352,34 @@
{
doFireSyncNotification( (SyncSource)sources.elementAt(i), bFinish, nErrCode, strMessage );
}
}
- void fireInitialSyncNotification( boolean bFinish, int nErrCode )
+ void fireBulkSyncNotification( boolean bFinish, String status, String partition, int nErrCode )
{
if ( getSync().getState() == SyncEngine.esExit )
return;
//TODO: show report
if( nErrCode != RhoRuby.ERR_NONE)
{
- String strMessage = RhoRuby.getMessageText("sync_failed_for") + "initial.";
+ String strMessage = RhoRuby.getMessageText("sync_failed_for") + "bulk.";
reportSyncStatus(strMessage,nErrCode,"");
}
try{
boolean bRemoveAfterFire = bFinish;
String strBody = "", strUrl;
synchronized(m_mxSyncNotifications)
{
- if ( m_initialSyncNotify == null )
+ if ( m_bulkSyncNotify == null )
return;
- strUrl = m_initialSyncNotify.m_strUrl;
+ strUrl = m_bulkSyncNotify.m_strUrl;
strBody = "rho_callback=1";
+ strBody += "&partition=" + partition;
+ strBody += "&bulk_status="+status;
strBody += "&status=";
if ( bFinish )
{
if ( nErrCode == RhoRuby.ERR_NONE )
strBody += "ok";
@@ -394,36 +393,26 @@
}
}
else
strBody += "in_progress";
- if ( m_initialSyncNotify.m_strParams.length() > 0 )
- strBody += "&" + m_initialSyncNotify.m_strParams;
+ if ( m_bulkSyncNotify.m_strParams.length() > 0 )
+ strBody += "&" + m_bulkSyncNotify.m_strParams;
- bRemoveAfterFire = bRemoveAfterFire && m_initialSyncNotify.m_bRemoveAfterFire;
+ bRemoveAfterFire = bRemoveAfterFire && m_bulkSyncNotify.m_bRemoveAfterFire;
}
if ( bRemoveAfterFire )
- clearInitialSyncNotification();
+ clearBulkSyncNotification();
- LOG.INFO( "Fire initial notification.Url :" + strUrl + "; Body: " + strBody );
-
- NetResponse resp = getNet().pushData( strUrl, strBody, null );
- if ( !resp.isOK() )
- LOG.ERROR( "Fire intial notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
- else
- {
- String szData = resp.getCharData();
- if ( szData != null && szData.equals("stop") )
- {
- clearInitialSyncNotification();
- }
- }
-
+ LOG.INFO( "Fire bulk notification.Url :" + strUrl + "; Body: " + strBody );
+
+ if ( callNotify(strUrl, strBody) )
+ clearBulkSyncNotification();
}catch(Exception exc)
{
- LOG.ERROR("Fire initial notification failed.", exc);
+ LOG.ERROR("Fire bulk notification failed.", exc);
}
}
void fireSyncNotification( SyncSource src, boolean bFinish, int nErrCode, String strMessage )
@@ -431,11 +420,11 @@
if ( getSync().getState() == SyncEngine.esExit )
return;
if( strMessage.length() > 0 || nErrCode != RhoRuby.ERR_NONE)
{
- if ( !( src != null && src.m_strParams.length()>0) )
+ if ( !( src != null && src.isSearch()) )
{
if ( src != null && (strMessage==null || strMessage.length() == 0) )
strMessage = RhoRuby.getMessageText("sync_failed_for") + src.getName() + ".";
reportSyncStatus(strMessage,nErrCode,src!= null?src.m_strError:"");
@@ -453,11 +442,16 @@
try{
String strBody = "", strUrl;
boolean bRemoveAfterFire = bFinish;
{
synchronized(m_mxSyncNotifications){
- SyncNotification sn = (SyncNotification)(src.isSearch() ? m_mapSearchNotifications.get(src.getID()) : m_mapSyncNotifications.get(src.getID()));
+ SyncNotification sn = null;
+ if ( src.isSearch() )
+ sn = m_pSearchNotification;
+ else
+ sn = (SyncNotification)m_mapSyncNotifications.get(src.getID());
+
if ( sn == null )
return;
strUrl = sn.m_strUrl;
strBody += "total_count=" + src.getTotalCount();
@@ -494,37 +488,55 @@
}
if ( bRemoveAfterFire )
clearNotification(src);
LOG.INFO( "Fire notification. Source ID: " + src.getID() + "; Url :" + strUrl + "; Body: " + strBody );
-
- NetResponse resp = getNet().pushData( strUrl, strBody, null );
- if ( !resp.isOK() )
- LOG.ERROR( "Fire notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
- else
- {
- String szData = resp.getCharData();
- if ( szData != null && szData.equals("stop") )
- {
- clearNotification(src);
- }
- }
-
+
+ if ( callNotify(strUrl, strBody) )
+ clearNotification(src);
}catch(Exception exc)
{
LOG.ERROR("Fire notification failed.", exc);
}
}
+ boolean callNotify(String strUrl, String strBody )throws Exception
+ {
+/* if ( getSync().isNoThreadedMode() )
+ {
+ const char* szName = strrchr(strUrl.c_str(), '/');
+ if (!szName)
+ szName = strUrl.c_str();
+ else
+ szName++;
+
+ String strName = "C_";
+ strName += szName;
+ rho_ruby_set_const( strName.c_str(), strBody.c_str());
+ return false;
+ }*/
+
+ NetResponse resp = getNet().pushData( strUrl, strBody, null );
+ if ( !resp.isOK() )
+ LOG.ERROR( "Fire object notification failed. Code: " + resp.getRespCode() + "; Error body: " + resp.getCharData() );
+ else
+ {
+ String szData = resp.getCharData();
+ return szData != null && szData.equals("stop");
+ }
+
+ return false;
+ }
+
void clearNotification(SyncSource src)
{
LOG.INFO( "Clear notification. Source : " + src.getName());
synchronized(m_mxSyncNotifications)
{
if ( src.isSearch() )
- m_mapSearchNotifications.remove(src.getID());
+ m_pSearchNotification = null;
else
m_mapSyncNotifications.remove(src.getID());
}
}
@@ -538,16 +550,16 @@
else
m_mapSyncNotifications.remove(new Integer(source_id));
}
}
- void clearInitialSyncNotification()
+ void clearBulkSyncNotification()
{
- LOG.INFO( "Clear initial notification." );
+ LOG.INFO( "Clear bulk notification." );
synchronized(m_mxSyncNotifications){
- m_initialSyncNotify = null;
+ m_bulkSyncNotify = null;
}
}
void cleanLastSyncObjectCount()
{
@@ -584,7 +596,26 @@
nCount = nCountVal.intValue();
}
return nCount;
}
+
+ void callLoginCallback(String callback, int nErrCode, String strMessage)
+ {
+ try{
+ String strBody = "error_code=" + nErrCode;
+ strBody += "&error_message=" + URI.urlEncode(strMessage != null? strMessage : "");
+ strBody += "&rho_callback=1";
+
+ String strUrl = getNet().resolveUrl(callback);
+
+ LOG.INFO( "Login callback: " + callback + ". Body: "+ strBody );
+
+ callNotify(strUrl, strBody);
+ }catch(Exception exc)
+ {
+ LOG.ERROR("Call Login callback failed.", exc);
+ }
+ }
+
}