diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 1445f4c26e..b7f100733b 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -80,6 +80,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key); void streamStateFreeCur(SStreamStateCur* pCur); +void streamStateResetCur(SStreamStateCur* pCur); int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 05391df671..c507fadeca 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2680,6 +2680,9 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); + if (!IS_VALID_SESSION_WIN(winInfo)) { + continue; + } int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 8bb0de6a1a..63274362c6 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -266,11 +266,11 @@ SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, pCur->pStreamFileState = pFileState; return pCur; } -static void transformCursor(SStreamFileState* pFileState, SStreamStateCur** ppCur) { - SStreamStateCur* pCur = *ppCur; - streamStateFreeCur(pCur); - pCur = createStreamStateCursor(); - (*ppCur) = pCur; +static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) { + if (!pCur) { + return; + } + streamStateResetCur(pCur); pCur->buffIndex = 0; pCur->pStreamFileState = pFileState; } @@ -278,12 +278,14 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur** ppCu static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); - if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0) { - transformCursor(pFileState, ppCur); - } else { - SStreamStateCur* pCur = *ppCur; - pCur->buffIndex = -1; - pCur->pStreamFileState = pFileState; + if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { + if ( !(*ppCur) ) { + (*ppCur) = createStreamStateCursor(); + } + transformCursor(pFileState, *ppCur); + } else if (*ppCur) { + (*ppCur)->buffIndex = -1; + (*ppCur)->pStreamFileState = pFileState; } } @@ -323,6 +325,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void if (!pCur) { return TSDB_CODE_FAILED; } + int32_t code = TSDB_CODE_SUCCESS; SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState); void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); @@ -342,20 +345,25 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void } *pKey = *(SSessionKey*)(pPos->pKey); } else { - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); - if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0) { - transformCursor(pCur->pStreamFileState, &pCur); - if (pCur->buffIndex >= size) { - return TSDB_CODE_FAILED; - } + void* pData = NULL; + code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); + if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { + transformCursor(pCur->pStreamFileState, pCur); SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); if (pVal) { *pVal = pPos; } *pKey = *(SSessionKey*)(pPos->pKey); + code = TSDB_CODE_SUCCESS; + } else if (code == TSDB_CODE_SUCCESS && pVal) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState); + pNewPos->needFree = true; + memcpy(pNewPos->pRowBuff, pData, *pVLen); + (*pVal) = pNewPos; } + taosMemoryFreeClear(pData); } - return TSDB_CODE_SUCCESS; + return code; } int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 68ba8890ce..9b16d5ecc2 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -678,17 +678,29 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { return tdbTbcMoveToPrev(pCur->pCur); #endif } + +void streamStateResetCur(SStreamStateCur* pCur) { + if (!pCur) { + return; + } + if (pCur->iter) rocksdb_iter_destroy(pCur->iter); + if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot); + if (pCur->readOpt) rocksdb_readoptions_destroy(pCur->readOpt); + + tdbTbcClose(pCur->pCur); + + memset(pCur, 0, sizeof(SStreamStateCur)); + + pCur->buffIndex = -1; +} + void streamStateFreeCur(SStreamStateCur* pCur) { if (!pCur || pCur->buffIndex >= 0) { taosMemoryFree(pCur); return; } qDebug("streamStateFreeCur"); - rocksdb_iter_destroy(pCur->iter); - if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot); - rocksdb_readoptions_destroy(pCur->readOpt); - - tdbTbcClose(pCur->pCur); + streamStateResetCur(pCur); taosMemoryFree(pCur); }