From 367b3b153fa717c0ef7b7c69ee30180e4cec5e62 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 13 Jun 2024 15:35:26 +0800 Subject: [PATCH] check cursor for count window --- source/libs/stream/src/streamSessionState.c | 12 +++++++----- source/libs/stream/src/streamState.c | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 4c61e6da1d..61f44e9b79 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -516,24 +516,25 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS return pBuffCur; } winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE)))); + taosMemoryFreeClear(pVal); + streamStateFreeCur(pBuffCur); if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) { - streamStateFreeCur(pCur); - return pBuffCur; + streamStateCurNext(pFileStore, pCur); + return pCur; } streamStateCurPrev(pFileStore, pCur); while (1) { code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateCurNext(pFileStore, pCur); - streamStateFreeCur(pBuffCur); return pCur; } winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE)))); + taosMemoryFreeClear(pVal); if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) { streamStateCurPrev(pFileStore, pCur); } else { streamStateCurNext(pFileStore, pCur); - streamStateFreeCur(pBuffCur); return pCur; } } @@ -568,7 +569,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void void* pData = NULL; code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); if (taosArrayGetSize(pWinStates) > 0 && - (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { + (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); if (pVal) { @@ -590,6 +591,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void } int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { + qDebug("move cursor to next"); if (pCur && pCur->buffIndex >= 0) { pCur->buffIndex++; } else { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 47324bd8c9..38d6a5c372 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -654,6 +654,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { #ifdef USE_ROCKSDB + qDebug("move cursor to next"); return streamStateCurPrev_rocksdb(pCur); #else if (!pCur) {