From 5cc1ae1266b4f6b760e58d34cb7a7c90eff71a31 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 22 Sep 2023 19:25:19 +0800 Subject: [PATCH] fix issue --- include/libs/stream/tstreamFileState.h | 1 + source/libs/stream/src/streamSessionState.c | 58 ++++++++++++++++----- source/libs/stream/src/streamState.c | 16 +++--- source/libs/stream/src/tstreamFileState.c | 2 +- 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 36f4274faa..87256f7496 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -73,6 +73,7 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState); // session window int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); +int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 63274362c6..17e7509c4e 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -88,29 +88,28 @@ static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* return pNewPos; } -int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { - SSessionKey* pWinKey = key; +int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) { SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { pWinStates = taosArrayInit(16, POINTER_BYTES); - tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); } - TSKEY startTs = pWinKey->win.skey; - TSKEY endTs = pWinKey->win.ekey; + TSKEY startTs = pKey->win.skey; + TSKEY endTs = pKey->win.ekey; int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { - (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); goto _end; } - // find the first position which is smaller than the pWinKey - int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); + // find the first position which is smaller than the pKey + int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare); SRowBuffPos* pPos = NULL; if (index >= 0) { @@ -118,7 +117,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, if (inSessionWindow(pPos->pKey, startTs, gap)) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; - *key = *pDestWinKey; + *pKey = *pDestWinKey; goto _end; } } @@ -128,7 +127,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; - *key = *pDestWinKey; + *pKey = *pDestWinKey; goto _end; } } @@ -137,7 +136,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { void* p = NULL; void* pFileStore = getStateFileStore(pFileState); - int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, pVLen); + int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); pNewPos->needFree = true; @@ -152,15 +151,46 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, } if (index == size - 1) { - (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); goto _end; } - (*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1); + (*pVal) = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1); _end: return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; } +int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SSessionKey* pKey = pPos->pKey; + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + pWinStates = taosArrayInit(16, POINTER_BYTES); + tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + } + + int32_t size = taosArrayGetSize(pWinStates); + if (size == 0) { + taosArrayPush(pWinStates, &pPos); + goto _end; + } + + // find the first position which is smaller than the pKey + int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare); + if (index >= 0) { + taosArrayInsert(pWinStates, index, &pPos); + } else { + taosArrayInsert(pWinStates, 0, &pPos); + } + +_end: + pPos->needFree = false; + return TSDB_CODE_SUCCESS; +} + int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); pNewPos->needFree = true; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 9b16d5ecc2..042cce831c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -714,15 +714,19 @@ void streamStateFreeVal(void* val) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { #ifdef USE_ROCKSDB + int32_t code = TSDB_CODE_SUCCESS; SRowBuffPos* pos = (SRowBuffPos*)value; if (pos->needFree) { - int32_t code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); - streamStateReleaseBuf(pState, pos, true); - qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, - key->win.ekey, key->groupId, code); - return code; + if (isFlushedState(pState->pFileState, key->win.ekey)) { + code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); + streamStateReleaseBuf(pState, pos, true); + qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, + key->win.ekey, key->groupId, code); + } else { + code = putSessionWinResultBuff(pState->pFileState, value); + } } - return TSDB_CODE_SUCCESS; + return code; #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index a13cb33042..b7a8ed60fb 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -660,7 +660,7 @@ void* getStateFileStore(SStreamFileState* pFileState) { } bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { - return ts < (pFileState->maxTs - pFileState->deleteMark); + return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); } bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) {