From 01e8fc583d19fb116f8c595f4db5399d9b43aa02 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Aug 2024 14:52:42 +0800 Subject: [PATCH 1/2] fix(query):adj error code for aggretate operator --- source/libs/executor/src/aggregateoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index b5a3f2f484..fe82e0eb62 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -193,8 +193,9 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { pAggInfo->pNewGroupBlock = NULL; tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); - QUERY_CHECK_CODE(code, lino, _end); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); + QUERY_CHECK_CODE(code, lino, _end); + code = doAggregateImpl(pOperator, pSup->pCtx); QUERY_CHECK_CODE(code, lino, _end); } From 925b28894515360025a0d6b0bee3e4bd01f2f69b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Aug 2024 15:55:11 +0800 Subject: [PATCH 2/2] limit the number of stream results --- .../executor/src/streamtimewindowoperator.c | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0651e2dbf6..6a1a5942d6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -49,6 +49,8 @@ #define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint" #define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint" +#define MAX_STREAM_HISTORY_RESULT 100000000 + typedef struct SStateWindowInfo { SResultWindowInfo winInfo; SStateKeys* pStateKey; @@ -161,11 +163,19 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { } int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { + if (tSimpleHashGetSize(pStUpdated) > MAX_STREAM_HISTORY_RESULT) { + qError("%s failed at line %d since too many history result. ", __func__, __LINE__); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey; return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { + if (tSimpleHashGetSize(pUpdatedMap) > MAX_STREAM_HISTORY_RESULT) { + qError("%s failed at line %d since too many history result. ", __func__, __LINE__); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES); } @@ -481,6 +491,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); + if (pInfo->pUpdatedMap != NULL) { + tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos); + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + } + if (pInfo->stateStore.streamFileStateDestroy != NULL) { pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); } @@ -495,11 +511,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupExprSupp(&pInfo->scalarSupp); - if (pInfo->pUpdatedMap != NULL) { - tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos); - tSimpleHashCleanup(pInfo->pUpdatedMap); - pInfo->pUpdatedMap = NULL; - } tSimpleHashCleanup(pInfo->pDeletedMap); blockDataDestroy(pInfo->pCheckpointRes); @@ -994,7 +1005,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; } -static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, +static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1166,6 +1177,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } + return code; } static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { @@ -1718,7 +1730,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); QUERY_CHECK_CODE(code, lino, _end); - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) { + code = TSDB_CODE_SUCCESS; + pOperator->status = OP_RES_TO_RETURN; + break; + } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); @@ -5184,7 +5201,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p } #endif - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap); + if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) { + pOperator->status = OP_RES_TO_RETURN; + code = TSDB_CODE_SUCCESS; + break; + } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); }