From 93211d7ef96ac80175db073960bb6e989aae053c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 5 Jan 2021 17:36:11 +0800 Subject: [PATCH] [TD-2567]: fix bug in continuous query. --- src/client/src/tscStream.c | 51 ++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index c1ed9b0ba0..90e67f39a1 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -65,44 +65,51 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in return retryDelta; } -static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { - SSqlStream *pStream = (SSqlStream *)pMsg->ahandle; - SSqlObj * pSql = pStream->pSql; +static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { + SSqlStream *pStream = (SSqlStream *)param; + assert(pStream->pSql == tres && code == TSDB_CODE_SUCCESS); - pSql->fp = tscProcessStreamQueryCallback; - pSql->fetchFp = tscProcessStreamQueryCallback; - pSql->param = pStream; + SSqlObj* pSql = (SSqlObj*) tres; + pSql->fp = doLaunchQuery; + pSql->fetchFp = doLaunchQuery; pSql->res.completed = false; - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int code = tscGetTableMeta(pSql, pTableMetaInfo); - pSql->res.code = code; - + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscGetSTableVgroupInfo(pSql, 0); - pSql->res.code = code; } - // failed to get meter/metric meta, retry in 10sec. - if (code != TSDB_CODE_SUCCESS) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); - tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); - tscSetRetryTimer(pStream, pSql, retryDelayTime); - - } else { + // failed to get table Meta or vgroup list, retry in 10sec. + if (code == TSDB_CODE_SUCCESS) { tscTansformSQLFuncForSTableQuery(pQueryInfo); - tscDebug("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name); - tscDoQuery(pStream->pSql); + tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, pTableMetaInfo->name); + + pSql->fp = tscProcessStreamQueryCallback; + pSql->fetchFp = tscProcessStreamQueryCallback; + tscDoQuery(pSql); tscIncStreamExecutionCount(pStream); + } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + pSql->res.code = code; + int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); + tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime); + tscSetRetryTimer(pStream, pSql, retryDelayTime); } } +static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { + SSqlStream *pStream = (SSqlStream *)pMsg->ahandle; + doLaunchQuery(pStream, pStream->pSql, 0); +} + static void tscProcessStreamTimer(void *handle, void *tmrId) { SSqlStream *pStream = (SSqlStream *)handle; - if (pStream == NULL) return; - if (pStream->pTimer != tmrId) return; + if (pStream == NULL || pStream->pTimer != tmrId) { + return; + } + pStream->pTimer = NULL; pStream->numOfRes = 0; // reset the numOfRes.