From ca7e1344e192887568b1ddd503e8ed47fb19d816 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 3 Jun 2024 15:20:05 +0800 Subject: [PATCH] adj stream count state buff --- source/libs/stream/src/streamSessionState.c | 52 ++++++++++++++------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 687b4bcf12..4c61e6da1d 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -734,6 +734,21 @@ _end: return code; } +int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + streamStateFreeCur(pCur); + if (code == TSDB_CODE_SUCCESS) { + return code; + } else { + pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey); + } + + code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + streamStateFreeCur(pCur); + return code; +} + int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { SSessionKey* pWinKey = pKey; const TSKEY gap = 0; @@ -755,14 +770,13 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C if (size == 0) { void* pFileStore = getStateFileStore(pFileState); void* pRockVal = NULL; - SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); - streamStateFreeCur(pCur); + code = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen); if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code); if (code == TSDB_CODE_SUCCESS) { int32_t valSize = *pVLen; - COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); + COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE))); if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); goto _end; @@ -798,20 +812,24 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C } if (index == -1) { - if (!isDeteled(pFileState, endTs)) { - void* p = NULL; - void* pFileStore = getStateFileStore(pFileState); - SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); + if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) { + SSessionKey tmpKey = *pWinKey; + void* pRockVal = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen); if (code_file == TSDB_CODE_SUCCESS) { - (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); - code = code_file; - qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); - streamStateFreeCur(pCur); - goto _end; + SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0); + SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey; + if (tmpKey.win.ekey < pFirstWinKey->win.skey) { + *pWinKey = tmpKey; + (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + code = code_file; + qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code_file); + goto _end; + } } - taosMemoryFree(p); - streamStateFreeCur(pCur); + taosMemoryFree(pRockVal); } }