From f6174d0b095c2a1c6b6d8e3e7b1529085ce3817c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 18 Sep 2024 17:00:38 +0800 Subject: [PATCH] adjust stream slice res --- source/libs/stream/src/tstreamFileState.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 7cd9fb7ea3..923aaa945b 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1013,10 +1013,11 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore); if (pCur == NULL) { - return -1; + return code; } int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount); - while (code == TSDB_CODE_SUCCESS) { + int32_t winRes = TSDB_CODE_SUCCESS; + while (winRes == TSDB_CODE_SUCCESS) { if (pFileState->curRowCount >= recoverNum) { break; } @@ -1024,8 +1025,8 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - code = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); - if (code != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) { + winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); taosMemoryFreeClear(pNode); @@ -1036,8 +1037,8 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; - code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); - if (code != TSDB_CODE_SUCCESS) { + winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); + if (winRes != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos); break; } @@ -1045,7 +1046,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { } streamStateFreeCur(pCur); - return TSDB_CODE_SUCCESS; + return code; } int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,