Merge pull request #5675 from taosdata/feature/qrefactor
[td-3662] <fix>: fix invalid write caused by top/bottom query.
This commit is contained in:
commit
09297fba5d
|
@ -129,6 +129,8 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
|
|||
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
|
||||
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
|
||||
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
|
||||
|
||||
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
|
||||
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
|
|
|
@ -338,11 +338,20 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
|
|||
pReducer->resColModel->capacity = pReducer->nResultBufSize;
|
||||
pReducer->finalModel = pFFModel;
|
||||
|
||||
int32_t expandFactor = 1;
|
||||
if (finalmodel->rowSize > 0) {
|
||||
pReducer->resColModel->capacity /= finalmodel->rowSize;
|
||||
bool topBotQuery = tscIsTopbotQuery(pQueryInfo);
|
||||
if (topBotQuery) {
|
||||
expandFactor = tscGetTopbotQueryParam(pQueryInfo);
|
||||
pReducer->resColModel->capacity /= (finalmodel->rowSize * expandFactor);
|
||||
pReducer->resColModel->capacity *= expandFactor;
|
||||
} else {
|
||||
pReducer->resColModel->capacity /= finalmodel->rowSize;
|
||||
}
|
||||
}
|
||||
|
||||
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize);
|
||||
|
||||
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
|
||||
|
||||
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
|
||||
|
@ -1440,6 +1449,11 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
tFilePage *tmpBuffer = pLocalMerge->pTempBuffer;
|
||||
|
||||
int32_t remain = 1;
|
||||
if (tscIsTopbotQuery(pQueryInfo)) {
|
||||
remain = tscGetTopbotQueryParam(pQueryInfo);
|
||||
}
|
||||
|
||||
if (doHandleLastRemainData(pSql)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1528,7 +1542,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
* if the previous group does NOT generate any result (pResBuf->num == 0),
|
||||
* continue to process results instead of return results.
|
||||
*/
|
||||
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) {
|
||||
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num + remain >= pLocalMerge->resColModel->capacity)) {
|
||||
// does not belong to the same group
|
||||
bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup);
|
||||
|
||||
|
|
|
@ -271,6 +271,41 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo) {
|
||||
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
if (pExpr == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t functionId = pExpr->functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo) {
|
||||
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
if (pExpr == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t functionId = pExpr->functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||
return (int32_t) pExpr->param[0].i64;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
|
||||
if (!tscIsPointInterpQuery(pQueryInfo)) {
|
||||
return;
|
||||
|
|
|
@ -2631,6 +2631,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
|
||||
|
||||
if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) {
|
||||
{ // set previous window
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
}
|
||||
bool load = false;
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionId = pTableScanInfo->pCtx[i].functionId;
|
||||
|
|
|
@ -159,6 +159,14 @@ if $data03 != @abc15@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
#sql select top(c6, 3) from select_tags_mt0 interval(10a)
|
||||
#sql select top(c6, 3) from select_tags_mt0 interval(10a) group by tbname;
|
||||
|
||||
sql select top(c6, 10) from select_tags_mt0 interval(10a);
|
||||
if $rows != 12800 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select top(c1, 100), tbname, t1, t2 from select_tags_mt0;
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
|
|
Loading…
Reference in New Issue