From 18f988283438acddaf874d8b95a6e6bc5b1c3edb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Jul 2023 16:06:23 +0800 Subject: [PATCH] fix recover error --- source/libs/stream/src/streamBackendRocksdb.c | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index da670d4da1..6012051fd8 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1491,9 +1491,13 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons return -1; } - size_t vlen = 0; - if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); - if (pVLen != NULL) *pVLen = vlen; + if (pVLen != NULL) { + size_t vlen = 0; + + const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); + *pVLen = decodeValueFunc((void*)valStr, vlen, NULL, (char**)&pVal); + } + *pKey = pKtmp->key; return 0; } @@ -1600,18 +1604,14 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* rocksdb_iter_seek(pCur->iter, buf, len); if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { - size_t vlen; - char* val = (char*)rocksdb_iter_value(pCur->iter, &vlen); - if (!streamStateValueIsStale(val)) { - SStateKey curKey; - size_t kLen = 0; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); + SStateKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { - pCur->number = pState->number; - return pCur; - } + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { + pCur->number = pState->number; + return pCur; } } streamStateFreeCur(pCur); @@ -1900,8 +1900,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, winKeyDecode(&winKey, keyStr); const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - // char* dst = NULL; - int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); if (len < 0) { return -1; }