stream event window mem leak

This commit is contained in:
54liuyao 2024-03-13 16:16:44 +08:00
parent 9ba7221910
commit c13338d965
3 changed files with 10 additions and 9 deletions

View File

@ -99,6 +99,11 @@ int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) {
return -1; return -1;
} }
int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pPos, true);
return TSDB_CODE_SUCCESS;
}
void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
@ -143,6 +148,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
pCurWin->winInfo.isOutput = false; pCurWin->winInfo.isOutput = false;
_end: _end:
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
pNextWinKey->groupId = groupId; pNextWinKey->groupId = groupId;
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0);
@ -341,6 +347,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
if (isWindowIncomplete(&curWin)) { if (isWindowIncomplete(&curWin)) {
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore);
continue; continue;
} }

View File

@ -1845,11 +1845,6 @@ int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pPos, true);
return TSDB_CODE_SUCCESS;
}
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
SSessionKey key = {0}; SSessionKey key = {0};
getSessionHashKey(pKey, &key); getSessionHashKey(pKey, &key);
@ -2495,7 +2490,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
return; return;
} }
SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1); SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1);
SSessionKey* pSeKey = pWinInfo->pStatePos->pKey; SSessionKey* pSeKey = &pWinInfo->sessionWin;
taosArrayPush(pMaxWins, pSeKey); taosArrayPush(pMaxWins, pSeKey);
if (pSeKey->groupId == 0) { if (pSeKey->groupId == 0) {
return; return;
@ -2503,7 +2498,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
uint64_t preGpId = pSeKey->groupId; uint64_t preGpId = pSeKey->groupId;
for (int32_t i = size - 2; i >= 0; i--) { for (int32_t i = size - 2; i >= 0; i--) {
pWinInfo = taosArrayGet(pAllWins, i); pWinInfo = taosArrayGet(pAllWins, i);
pSeKey = pWinInfo->pStatePos->pKey; pSeKey = &pWinInfo->sessionWin;
if (preGpId != pSeKey->groupId) { if (preGpId != pSeKey->groupId) {
taosArrayPush(pMaxWins, pSeKey); taosArrayPush(pMaxWins, pSeKey);
preGpId = pSeKey->groupId; preGpId = pSeKey->groupId;

View File

@ -310,7 +310,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
int32_t size = taosArrayGetSize(pWinStates); int32_t size = taosArrayGetSize(pWinStates);
if (pCur->buffIndex >= 0) { if (pCur->buffIndex >= 0) {
if (pCur->buffIndex >= size) { if (pCur->buffIndex >= size) {
pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size); pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey);
goto _end; goto _end;
} }
pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex); pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex);
@ -332,7 +332,6 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
} }
_end: _end:
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
(*ppVal) = pNewPos; (*ppVal) = pNewPos;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }