[td-225]fix bug found by regression test.
This commit is contained in:
parent
b45f8b24f0
commit
e40c1ffca5
|
@ -102,7 +102,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
|
|||
}
|
||||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->pVgroupTables == NULL) && (pTableMetaInfo->vgroupList == NULL || pTableMetaInfo->vgroupList->numOfVgroups <= 0)) {
|
||||
tscDebug("%p empty vgroup list", pSql);
|
||||
tscDebug("0x%"PRIx64" empty vgroup list", pSql->self);
|
||||
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
|
||||
code = TSDB_CODE_TSC_APP_ERROR;
|
||||
}
|
||||
|
@ -110,10 +110,9 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
|
|||
// failed to get table Meta or vgroup list, retry in 10sec.
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
tscTansformFuncForSTableQuery(pQueryInfo);
|
||||
tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name));
|
||||
tscDebug("0x%"PRIx64" stream:%p started to query table:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name));
|
||||
|
||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||
pSql->cmd.active = pQueryInfo;
|
||||
|
||||
pSql->fp = tscProcessStreamQueryCallback;
|
||||
pSql->fetchFp = tscProcessStreamQueryCallback;
|
||||
|
@ -140,7 +139,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
pStream->numOfRes = 0; // reset the numOfRes.
|
||||
SSqlObj *pSql = pStream->pSql;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
|
||||
tscDebug("0x%"PRIx64" add into timer", pSql->self);
|
||||
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
|
||||
|
||||
if (pStream->isProject) {
|
||||
/*
|
||||
|
@ -195,8 +194,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
|
|||
SSqlStream *pStream = (SSqlStream *)param;
|
||||
if (tres == NULL || numOfRows < 0) {
|
||||
int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
|
||||
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
|
||||
retryDelay);
|
||||
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
|
||||
pStream, numOfRows, retryDelay);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
|
||||
|
||||
|
@ -260,13 +259,14 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
|||
|
||||
if (pSql == NULL || numOfRows < 0) {
|
||||
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
|
||||
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
||||
tscError("0x%"PRIx64" stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 " ms", pSql->self, pStream, numOfRows, retryDelayTime);
|
||||
|
||||
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
||||
return;
|
||||
}
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
|
||||
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||
|
||||
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
|
@ -293,7 +293,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
|||
/* no resuls in the query range, retry */
|
||||
// todo set retry dynamic time
|
||||
int32_t retry = tsProjectExecInterval;
|
||||
tscError("%p stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
|
||||
tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry);
|
||||
|
||||
tscSetRetryTimer(pStream, pStream->pSql, retry);
|
||||
return;
|
||||
|
@ -306,6 +306,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
|||
pStream->numOfRes);
|
||||
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
if (pQueryInfo->pQInfo != NULL) {
|
||||
qDestroyQueryInfo(pQueryInfo->pQInfo);
|
||||
pQueryInfo->pQInfo = NULL;
|
||||
}
|
||||
|
||||
tscFreeSqlResult(pSql);
|
||||
tscFreeSubobj(pSql);
|
||||
|
@ -338,10 +342,10 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
|
|||
return;
|
||||
}
|
||||
|
||||
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
|
||||
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 "(ts window ekey), in %" PRId64 " ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
|
||||
now + timer, timer, delay, pStream->stime, etime);
|
||||
} else {
|
||||
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
|
||||
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 "(ts window ekey), in %" PRId64 " ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
|
||||
pStream->stime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1);
|
||||
}
|
||||
|
||||
|
@ -399,7 +403,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
|||
}
|
||||
} else {
|
||||
int64_t stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision);
|
||||
//int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
|
||||
if (stime >= pStream->etime) {
|
||||
tscDebug("0x%"PRIx64" stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql->self, pStream,
|
||||
pStream->stime, pStream->etime);
|
||||
|
@ -441,7 +444,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
|||
}
|
||||
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
|
||||
tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
|
||||
tscWarn("0x%"PRIx64" stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
|
||||
(int64_t)pQueryInfo->interval.interval, minIntervalTime);
|
||||
pQueryInfo->interval.interval = minIntervalTime;
|
||||
}
|
||||
|
@ -458,15 +461,15 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
|||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
|
||||
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.sliding < minSlidingTime) {
|
||||
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
|
||||
tscWarn("0x%"PRIx64" stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
|
||||
pQueryInfo->interval.sliding, minSlidingTime);
|
||||
|
||||
pQueryInfo->interval.sliding = minSlidingTime;
|
||||
}
|
||||
|
||||
if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) {
|
||||
tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64, pSql, pStream,
|
||||
pQueryInfo->interval.sliding, pQueryInfo->interval.interval);
|
||||
tscWarn("0x%"PRIx64" stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64,
|
||||
pSql->self, pStream, pQueryInfo->interval.sliding, pQueryInfo->interval.interval);
|
||||
|
||||
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
|
||||
}
|
||||
|
@ -508,7 +511,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
|||
} else {
|
||||
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
|
||||
if (newStime != stime) {
|
||||
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
|
||||
tscWarn("0x%"PRIx64" stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql->self, pStream, stime, newStime);
|
||||
stime = newStime;
|
||||
}
|
||||
}
|
||||
|
@ -539,7 +542,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
|||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pSql->res.code = code;
|
||||
tscError("%p open stream failed, sql:%s, reason:%s, code:%s", pSql, pSql->sqlstr, pCmd->payload, tstrerror(code));
|
||||
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
|
||||
|
||||
pStream->fp(pStream->param, NULL, NULL);
|
||||
return;
|
||||
|
@ -558,7 +561,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
|||
if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) {
|
||||
pSql->res.code = code;
|
||||
|
||||
tscError("%p stream %p open failed, since the interval value is incorrect", pSql, pStream);
|
||||
tscError("0x%"PRIx64" stream %p open failed, since the interval value is incorrect", pSql->self, pStream);
|
||||
pStream->fp(pStream->param, NULL, NULL);
|
||||
return;
|
||||
}
|
||||
|
@ -621,19 +624,19 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
|
||||
strtolower(pSql->sqlstr, sqlstr);
|
||||
|
||||
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
|
||||
registerSqlObj(pSql);
|
||||
|
||||
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
|
||||
tsem_init(&pSql->rspSem, 0, 0);
|
||||
|
||||
pSql->fp = tscCreateStream;
|
||||
pSql->fetchFp = tscCreateStream;
|
||||
|
||||
registerSqlObj(pSql);
|
||||
|
||||
int32_t code = tsParseSql(pSql, true);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
tscCreateStream(pStream, pSql, code);
|
||||
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(code));
|
||||
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
free(pStream);
|
||||
return NULL;
|
||||
|
|
|
@ -2508,11 +2508,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
assert(pNewQueryInfo->tsBuf != NULL);
|
||||
}
|
||||
|
||||
tscDebug("0x%"PRIx64" sub:%p create subquery success. orderOfSub:%d", pSql->self, pNew, trs->subqueryIndex);
|
||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
|
||||
trs->subqueryIndex);
|
||||
}
|
||||
|
||||
if (i < pState->numOfSub) {
|
||||
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
|
||||
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
|
||||
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
||||
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
|
||||
|
@ -2715,8 +2716,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
// data in from current vnode is stored in cache and disk
|
||||
uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
|
||||
SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
|
||||
tscDebug("0x%"PRIx64" sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql->self, pSql,
|
||||
vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
|
||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql->self,
|
||||
pSql->self, vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
|
||||
|
||||
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
|
||||
|
||||
|
@ -2729,8 +2730,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
#endif
|
||||
|
||||
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
|
||||
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
|
||||
tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
|
||||
tscError("0x%"PRIx64" sub:0x%"PRIx64" client disk space remain %.3f GB, need at least %.3f GB, stop query",
|
||||
pParentSql->self, pSql->self, tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
|
||||
tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
|
||||
return;
|
||||
}
|
||||
|
@ -2744,7 +2745,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
}
|
||||
|
||||
if (!subAndCheckDone(pSql, pParentSql, idx)) {
|
||||
tscDebug("0x%"PRIx64" sub:%p orderOfSub:%d freed, not finished", pParentSql->self, pSql, trsupport->subqueryIndex);
|
||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not finished", pParentSql->self, pSql->self,
|
||||
trsupport->subqueryIndex);
|
||||
|
||||
tscFreeRetrieveSup(pSql);
|
||||
return;
|
||||
|
|
|
@ -832,7 +832,7 @@ void tscFreeSqlResult(SSqlObj* pSql) {
|
|||
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
tscDestroyResPointerInfo(pRes);
|
||||
|
||||
|
||||
memset(&pSql->res, 0, sizeof(SSqlRes));
|
||||
}
|
||||
|
||||
|
@ -2739,13 +2739,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
assert(pFinalInfo->vgroupList != NULL);
|
||||
}
|
||||
|
||||
registerSqlObj(pNew);
|
||||
|
||||
if (cmd == TSDB_SQL_SELECT) {
|
||||
size_t size = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
|
||||
tscDebug(
|
||||
"%p new subquery:%p, tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ","
|
||||
tscDebug("0x%"PRIx64" new subquery:0x%"PRIx64", tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ","
|
||||
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
|
||||
pSql->self, pNew->self, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
|
||||
size, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pFinalInfo->name), pNewQueryInfo->window.skey,
|
||||
pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
|
||||
|
||||
|
@ -2754,7 +2755,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
tscDebug("0x%"PRIx64" new sub insertion: %p, vnodeIdx:%d", pSql->self, pNew, pTableMetaInfo->vgroupIndex);
|
||||
}
|
||||
|
||||
registerSqlObj(pNew);
|
||||
return pNew;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -537,7 +537,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
|
|||
|
||||
int64_t endt = taosGetTimestampMs();
|
||||
|
||||
qDebug("QInfo:%"PRIu64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
|
||||
qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
|
||||
pGroupResInfo->currentGroup, endt - startt);
|
||||
|
||||
_end:
|
||||
|
|
|
@ -55,6 +55,4 @@ run general/parser/sliding.sim
|
|||
run general/parser/function.sim
|
||||
run general/parser/stableOp.sim
|
||||
run general/parser/having.sim
|
||||
run general/parser/having_child.sim
|
||||
|
||||
|
||||
run general/parser/having_child.sim
|
Loading…
Reference in New Issue