From 3214157694be09bdf3a22e6e423aad377677a7f2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 27 Sep 2024 11:17:03 +0800 Subject: [PATCH] valid iter --- source/libs/stream/src/streamBackendRocksdb.c | 9 +++++++++ source/libs/stream/src/streamState.c | 16 ++++++++-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7a8be34781..a2c9012df5 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3657,6 +3657,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + if (pCur->iter == NULL) { + streamStateFreeCur(pCur); + return NULL; + } char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -3679,6 +3683,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey)); if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { streamStateFreeCur(pCur); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 41ff8f3c24..1801c6e029 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -99,8 +99,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); @@ -170,12 +170,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* int32_t lino = 0; void* pVal = NULL; int32_t len = getRowStateRowSize(pState->pFileState); - int32_t tmpLen = len; + int32_t tmpLen = len; code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen); QUERY_CHECK_CODE(code, lino, _end); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); _end: @@ -189,12 +189,12 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa int32_t lino = 0; void* pVal = NULL; int32_t len = getRowStateRowSize(pState->pFileState); - int32_t tmpLen = len; + int32_t tmpLen = len; code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen); QUERY_CHECK_CODE(code, lino, _end); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; streamStateReleaseBuf(pState, pVal, false);