From 7517228dde6571337419bf4852627ed8cb115d4e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sun, 29 Sep 2024 14:16:16 +0800 Subject: [PATCH 1/2] fix(stream):fix issue for stream count window state buff --- include/libs/executor/storageapi.h | 2 +- include/libs/stream/streamState.h | 2 +- include/libs/stream/tstreamFileState.h | 2 +- source/libs/executor/inc/streamexecutorInt.h | 2 ++ .../executor/src/streamcountwindowoperator.c | 10 ++++--- source/libs/stream/src/streamSessionState.c | 26 ++++++++++++------- source/libs/stream/src/streamState.c | 4 +-- 7 files changed, 30 insertions(+), 18 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 7a845e43c3..8e88a1a278 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -382,7 +382,7 @@ typedef struct SStateStore { int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen, int32_t* pWinCode); - int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); + int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 46874b7c65..f9469a449d 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -87,7 +87,7 @@ void streamStateFreeVal(void* val); // count window int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen, int32_t* pWinCode); -int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 6f1a1b3b98..a265ae7e60 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -110,7 +110,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch // count window int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen, int32_t* pWinCode); -int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); // function int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index c6053cc96e..ab00dceb20 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -26,6 +26,8 @@ void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); +void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 8b2a1b7c71..33b3e7748c 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -90,7 +90,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group if (isSlidingCountWindow(pAggSup)) { if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) { - code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, + code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); QUERY_CHECK_CODE(code, lino, _end); @@ -101,9 +101,11 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin, (void**)&pCurWin->winInfo.pStatePos, &size); if (winCode == TSDB_CODE_FAILED) { - code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, + code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); QUERY_CHECK_CODE(code, lino, _end); + } else { + reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore); } } else { pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin, @@ -111,9 +113,11 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin, (void**)&pCurWin->winInfo.pStatePos, &size); if (winCode == TSDB_CODE_FAILED) { - code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, + code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); QUERY_CHECK_CODE(code, lino, _end); + } else { + reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore); } } if (ts < pCurWin->winInfo.sessionWin.win.ekey) { diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 23598cf717..7e3d8d59f9 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -1060,7 +1060,7 @@ _end: return code; } -int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { +int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { SSessionKey* pWinKey = pKey; const TSKEY gap = 0; int32_t code = TSDB_CODE_SUCCESS; @@ -1082,21 +1082,27 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { void* pFileStore = getStateFileStore(pFileState); - void* p = NULL; + void* pRockVal = NULL; - int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen); + int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen); if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) { - (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); - if (!(*pVal)) { - code = TSDB_CODE_OUT_OF_MEMORY; + int32_t valSize = *pVLen; + COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE))); + if ((*pWinStateCount) == winCount) { + code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal); QUERY_CHECK_CODE(code, lino, _end); - } - - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + } else { + (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + } } else { code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal); - taosMemoryFree(p); + taosMemoryFree(pRockVal); QUERY_CHECK_CODE(code, lino, _end); } } else { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1801c6e029..1994c882aa 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -545,6 +545,6 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode); } -int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { - return createCountWinResultBuff(pState->pFileState, pKey, pVal, pVLen); +int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { + return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen); } From 2c0445fff47b1e25a507f61258454440b8338134 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sun, 29 Sep 2024 14:53:47 +0800 Subject: [PATCH 2/2] add check --- source/libs/stream/src/tstreamFileState.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f7a88746ec..cf5f1b2b91 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -445,7 +445,9 @@ _end: } int32_t clearRowBuff(SStreamFileState* pFileState) { - clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); + if (pFileState->deleteMark != INT64_MAX) { + clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); + } if (isListEmpty(pFileState->freeBuffs)) { return flushRowBuff(pFileState); }