fix bugs found in regression test.
This commit is contained in:
parent
960d40d4b4
commit
8928dea534
|
@ -2795,7 +2795,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
int32_t condLen = 0;
|
||||
if (pTagCond->numOfTagCond > 0) {
|
||||
SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
|
||||
if (pCond != NULL) {
|
||||
if (pCond != NULL && pCond->cond != NULL) {
|
||||
condLen = strlen(pCond->cond) + 1;
|
||||
|
||||
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
|
||||
|
|
|
@ -64,7 +64,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
|
|||
size_t redundantLen = 20;
|
||||
|
||||
size_t bufSize = strlen(pMeterMetaInfo->name) + tbnameCondLen + strlen(join) + strlen(tagIdBuf);
|
||||
if (cond != NULL) {
|
||||
if (cond != NULL && cond->cond != NULL) {
|
||||
bufSize += strlen(cond->cond);
|
||||
}
|
||||
|
||||
|
@ -72,7 +72,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
|
|||
char* tmp = calloc(1, bufSize);
|
||||
|
||||
int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name,
|
||||
(cond != NULL ? cond->cond : NULL), (tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL),
|
||||
((cond != NULL && cond->cond != NULL) ? cond->cond : NULL), (tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL),
|
||||
pTagCond->relType, join, tagIdBuf, pQueryInfo->groupbyExpr.orderType);
|
||||
|
||||
assert(keyLen <= bufSize);
|
||||
|
|
|
@ -178,7 +178,7 @@ SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeE
|
|||
// SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
|
||||
// __block_search_fn_t searchFn);
|
||||
|
||||
void applyIntervalQueryOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo,
|
||||
void stableApplyFunctionsOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo,
|
||||
SBlockInfo* pBlockInfo, SField* pFields, __block_search_fn_t searchFn);
|
||||
|
||||
int32_t vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo,
|
||||
|
|
|
@ -190,10 +190,7 @@ typedef struct SMeterQueryInfo {
|
|||
int64_t skey;
|
||||
int64_t ekey;
|
||||
int32_t numOfRes;
|
||||
// int32_t reverseIndex; // reversed output indicator, start from (numOfRes-1)
|
||||
// int16_t reverseFillRes; // denote if reverse fill the results in supplementary scan required or not
|
||||
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
|
||||
int16_t lastResRows;
|
||||
int64_t tag;
|
||||
STSCursor cur;
|
||||
int32_t sid; // for retrieve the page id list
|
||||
|
|
|
@ -2473,7 +2473,7 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY
|
|||
!QUERY_IS_ASC_QUERY(pQuery)));
|
||||
}
|
||||
|
||||
static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn,
|
||||
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn,
|
||||
SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes,
|
||||
SWindowResInfo *pWindowResInfo) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
@ -2482,7 +2482,6 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
|
|||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
int32_t forwardStep =
|
||||
getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryColumn, pQuery->pos, pQuery->ekey, searchFn, true);
|
||||
|
||||
assert(forwardStep >= 0);
|
||||
|
||||
int32_t newForwardStep = reviseForwardSteps(pRuntimeEnv, forwardStep);
|
||||
|
@ -5220,7 +5219,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
|
|||
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
|
||||
|
||||
if (blockLoadStatus == DISK_DATA_LOADED) {
|
||||
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot],
|
||||
*forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot],
|
||||
searchFn, numOfRes, &pRuntimeEnv->windowResInfo);
|
||||
} else {
|
||||
*forwardStep = pblockInfo->size;
|
||||
|
@ -5233,7 +5232,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
|
|||
SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
|
||||
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
|
||||
|
||||
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes,
|
||||
*forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes,
|
||||
&pRuntimeEnv->windowResInfo);
|
||||
|
||||
pSummary->cacheTimeUs += (taosGetTimestampUs() - start);
|
||||
|
@ -6567,7 +6566,6 @@ SMeterQueryInfo *createMeterQueryInfo(STableQuerySupportObj *pSupporter, int32_t
|
|||
pMeterQueryInfo->ekey = ekey;
|
||||
pMeterQueryInfo->lastKey = skey;
|
||||
|
||||
pMeterQueryInfo->lastResRows = 0;
|
||||
pMeterQueryInfo->sid = sid;
|
||||
pMeterQueryInfo->cur.vnodeIndex = -1;
|
||||
|
||||
|
@ -7130,7 +7128,6 @@ int32_t setIntervalQueryExecutionContext(STableQuerySupportObj *pSupporter, int3
|
|||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
assert(pMeterQueryInfo->lastKey > 0);
|
||||
|
||||
// if (IS_MASTER_SCAN(pRuntimeEnv)) {
|
||||
// not enough disk space or memory buffer for intermediate results
|
||||
if (setOutputBufferForIntervalQuery(pRuntimeEnv, pMeterQueryInfo) != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
|
@ -7185,33 +7182,12 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
|
|||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (pMeterQueryInfo->queryRangeSet) {
|
||||
// assert((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->skey) ||
|
||||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->skey));
|
||||
//
|
||||
// if ((pQuery->ekey < key && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->ekey > key && !QUERY_IS_ASC_QUERY(pQuery)))
|
||||
// {
|
||||
// /*
|
||||
// * last query on this block of the meter is done, start next interval on this block
|
||||
// * otherwise, keep the previous query range and proceed
|
||||
// */
|
||||
// getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
// saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
//
|
||||
// // previous query does not be closed, save the results and close it
|
||||
// if (pMeterQueryInfo->lastResRows > 0) {
|
||||
// saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
// }
|
||||
// } else {
|
||||
// /* current query not completed, continue. do nothing with respect to query range, */
|
||||
// }
|
||||
// pQuery->lastKey = key;
|
||||
// pMeterQueryInfo->lastKey = key;
|
||||
pQuery->lastKey = key;
|
||||
pMeterQueryInfo->lastKey = key;
|
||||
} else {
|
||||
pQuery->skey = key;
|
||||
STimeWindow win = {.skey = key, pSupporter->rawEKey};
|
||||
|
||||
assert(pMeterQueryInfo->lastResRows == 0);
|
||||
|
||||
// for too small query range, no data in this interval.
|
||||
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->ekey < pQuery->skey)) ||
|
||||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey < pQuery->ekey))) {
|
||||
|
@ -7530,7 +7506,7 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
|
|||
assert(pQuery->pointsRead <= pQuery->pointsToRead);
|
||||
}
|
||||
|
||||
void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo,
|
||||
void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo,
|
||||
SBlockInfo *pBlockInfo, SField *pFields, __block_search_fn_t searchFn) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
@ -7539,17 +7515,6 @@ void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo
|
|||
|
||||
int64_t *pPrimaryKey = (int64_t *)pRuntimeEnv->primaryColBuffer->data;
|
||||
|
||||
int32_t blockStatus = pRuntimeEnv->blockStatus;
|
||||
|
||||
/*
|
||||
* for each block, we need to handle the previous query, since the determination of previous query being completed
|
||||
* or not is based on the start key of current block.
|
||||
*/
|
||||
TSKEY key = getNextAccessedKeyInData(pQuery, pPrimaryKey, pBlockInfo, blockStatus);
|
||||
if (pQuery->intervalTime > 0) {
|
||||
setIntervalQueryRange(pMeterQueryInfo, pSupporter, key);
|
||||
}
|
||||
|
||||
int32_t forwardStep =
|
||||
getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryKey, pQuery->pos, pQuery->ekey, searchFn, true);
|
||||
|
||||
|
@ -7571,14 +7536,6 @@ void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo
|
|||
}
|
||||
}
|
||||
|
||||
// get the true maximum timestamp within the query range to set the correct time window
|
||||
// in the supplementary query
|
||||
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
// if ((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
// pMeterQueryInfo->ekey = pQuery->lastKey - step;
|
||||
// }
|
||||
|
||||
updatelastkey(pQuery, pMeterQueryInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -211,6 +211,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
|
|||
setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, k, pMeterInfo[k].groupIdx,
|
||||
pMeterQueryInfo);
|
||||
} else {
|
||||
setIntervalQueryRange(pMeterQueryInfo, pSupporter, key);
|
||||
int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
pQInfo->killed = 1;
|
||||
|
@ -230,7 +231,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
|
|||
pRuntimeEnv->blockStatus);
|
||||
|
||||
totalBlocks++;
|
||||
applyIntervalQueryOnBlock(pSupporter, &pMeterInfo[k], &binfo, NULL, searchFn);
|
||||
stableApplyFunctionsOnBlock(pSupporter, &pMeterInfo[k], &binfo, NULL, searchFn);
|
||||
|
||||
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
|
||||
break;
|
||||
|
@ -431,7 +432,6 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
|
|||
pOneMeterDataInfo->groupIdx, pMeterQueryInfo);
|
||||
} else /* if (pQuery->intervalTime > 0)*/ { // interval query
|
||||
setIntervalQueryRange(pMeterQueryInfo, pSupporter, nextKey);
|
||||
|
||||
ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
tfree(pReqMeterDataInfo); // error code has been set
|
||||
|
@ -440,7 +440,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
|
|||
}
|
||||
}
|
||||
|
||||
applyIntervalQueryOnBlock(pSupporter, pOneMeterDataInfo, &binfo, pInfoEx->pBlock.fields, searchFn);
|
||||
stableApplyFunctionsOnBlock(pSupporter, pOneMeterDataInfo, &binfo, pInfoEx->pBlock.fields, searchFn);
|
||||
}
|
||||
|
||||
tfree(pReqMeterDataInfo);
|
||||
|
|
Loading…
Reference in New Issue