[TD-2701]<fix>: fix crash in cq

This commit is contained in:
Haojun Liao 2021-01-08 16:55:24 +08:00
parent 400bec62ae
commit 55cfce602b
1 changed files with 23 additions and 7 deletions

View File

@ -65,15 +65,30 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in
return retryDelta;
}
static void setRetryInfo(SSqlStream* pStream, int32_t code) {
SSqlObj* pSql = pStream->pSql;
pSql->res.code = code;
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
}
static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
SSqlStream *pStream = (SSqlStream *)param;
assert(pStream->pSql == tres && code == TSDB_CODE_SUCCESS);
assert(pStream->pSql == tres);
SSqlObj* pSql = (SSqlObj*) tres;
pSql->fp = doLaunchQuery;
pSql->fp = doLaunchQuery;
pSql->fetchFp = doLaunchQuery;
pSql->res.completed = false;
if (code != TSDB_CODE_SUCCESS) {
setRetryInfo(pStream, code);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@ -82,6 +97,10 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
code = tscGetSTableVgroupInfo(pSql, 0);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
// failed to get table Meta or vgroup list, retry in 10sec.
if (code == TSDB_CODE_SUCCESS) {
tscTansformSQLFuncForSTableQuery(pQueryInfo);
@ -91,11 +110,8 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
pSql->fetchFp = tscProcessStreamQueryCallback;
tscDoQuery(pSql);
tscIncStreamExecutionCount(pStream);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
pSql->res.code = code;
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
} else {
setRetryInfo(pStream, code);
}
}