Merge pull request #27548 from taosdata/fix/ly_res

fix(stream):limit the number of stream results
This commit is contained in:
Haojun Liao 2024-08-29 19:29:16 +08:00 committed by GitHub
commit 0a7386cba5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 9 deletions

View File

@ -193,8 +193,9 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
pAggInfo->pNewGroupBlock = NULL;
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
QUERY_CHECK_CODE(code, lino, _end);
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);
code = doAggregateImpl(pOperator, pSup->pCtx);
QUERY_CHECK_CODE(code, lino, _end);
}

View File

@ -49,6 +49,8 @@
#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint"
#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint"
#define MAX_STREAM_HISTORY_RESULT 100000000
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys* pStateKey;
@ -161,11 +163,19 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
}
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
if (tSimpleHashGetSize(pStUpdated) > MAX_STREAM_HISTORY_RESULT) {
qError("%s failed at line %d since too many history result. ", __func__, __LINE__);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
}
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
if (tSimpleHashGetSize(pUpdatedMap) > MAX_STREAM_HISTORY_RESULT) {
qError("%s failed at line %d since too many history result. ", __func__, __LINE__);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
}
@ -481,6 +491,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pMidRetriveRes);
blockDataDestroy(pInfo->pMidPulloverRes);
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
}
@ -495,11 +511,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
tSimpleHashCleanup(pInfo->pDeletedMap);
blockDataDestroy(pInfo->pCheckpointRes);
@ -994,7 +1005,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1166,6 +1177,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
@ -1718,7 +1730,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
QUERY_CHECK_CODE(code, lino, _end);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) {
code = TSDB_CODE_SUCCESS;
pOperator->status = OP_RES_TO_RETURN;
break;
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
@ -5184,7 +5201,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
}
#endif
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) {
pOperator->status = OP_RES_TO_RETURN;
code = TSDB_CODE_SUCCESS;
break;
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
}