From 8c6ea4079c0c72918c8494d54f82f55a78454451 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 22 Feb 2024 14:15:35 +0800 Subject: [PATCH] delete invalid result --- .../executor/src/streamcountwindowoperator.c | 16 +++++++++++++--- source/libs/stream/src/streamSessionState.c | 3 +-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 080f9d4e2b..689ba54ca6 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -115,8 +115,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } } -static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, - SSHashObj* pStDeleted, bool* pRebuild) { +static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { + SSessionKey key = {0}; + getSessionHashKey(pKey, &key); + tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); + tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); +} + +static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, + int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated, + SSHashObj* pStDeleted, bool* pRebuild) { SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; int32_t num = 0; for (int32_t i = start; i < rows; i++) { @@ -148,6 +156,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI if (needDelState) { memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); + removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey); if (pWinInfo->winInfo.pStatePos->needFree) { pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey); } @@ -242,7 +251,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl setSessionWinOutputInfo(pStUpdated, &curWin.winInfo); slidingRows = *curWin.pWindowCount; if (!buffInfo.rebuildWindow) { - winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow); + winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, + pStDeleted, &buffInfo.rebuildWindow); } if (buffInfo.rebuildWindow) { SSessionKey range = {0}; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index bd28d2bca9..e2aae130e5 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -736,6 +736,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C void* pRockVal = NULL; SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); + streamStateFreeCur(pCur); if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); if (code == TSDB_CODE_SUCCESS) { @@ -743,7 +744,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); - streamStateFreeCur(pCur); goto _end; } } @@ -751,7 +751,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C pWinKey->win.ekey = endTs; (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); taosMemoryFree(pRockVal); - streamStateFreeCur(pCur); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); code = TSDB_CODE_FAILED;