From caa45033526d8a29293d3be5b55d9664d0f0dc30 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 18 Jul 2023 18:02:39 +0800 Subject: [PATCH] reset state key memory --- source/libs/executor/src/timewindowoperator.c | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c4111ded92..51da73d4d7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4036,7 +4036,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { bool compareStateKey(void* data, void* key) { if (!data || !key) { - return true; + return false; } SStateKeys* stateKey = (SStateKeys*)key; stateKey->pData = (char*)key + sizeof(SStateKeys); @@ -4062,7 +4062,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { code = TSDB_CODE_FAILED; releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); - pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size); + pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size); + pCurWin->pStateKey = + (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pCurWin->pStateKey->type = pAggSup->stateKeyType; + pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); + pCurWin->pStateKey->isNull = false; } if (code == TSDB_CODE_SUCCESS) { @@ -4076,11 +4082,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; - pNextWin->winInfo.pOutputBuf = NULL; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); - code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0); + SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); + int32_t nextSize = pAggSup->resultRowSize; + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, &pNextWin->winInfo.pOutputBuf, &nextSize); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); + } else { + pNextWin->pStateKey = + (SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pNextWin->pStateKey->type = pAggSup->stateKeyType; + pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys); + pNextWin->pStateKey->isNull = false; + pNextWin->winInfo.isOutput = true; } pAggSup->stateStore.streamStateFreeCur(pCur); } @@ -4156,6 +4170,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SStateWindowInfo curWin = {0}; SStateWindowInfo nextWin = {0}; setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); + if (IS_VALID_SESSION_WIN(nextWin.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextWin.winInfo.pOutputBuf, &pAPI->stateStore); + } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, pAggSup->pResultRows, pSeUpdated, pStDeleted); @@ -4346,9 +4363,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; + SStateWindowInfo dummy = {0}; setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); + saveResult(curInfo.winInfo, pInfo->pStUpdated); + } + + if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + } + + if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); } } taosMemoryFree(pBuf);