adjust stream slice res

This commit is contained in:
54liuyao 2024-09-18 17:00:38 +08:00
parent c4566f98b8
commit f6174d0b09
1 changed files with 8 additions and 7 deletions

View File

@ -1013,10 +1013,11 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore); SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) { if (pCur == NULL) {
return -1; return code;
} }
int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount); int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
while (code == TSDB_CODE_SUCCESS) { int32_t winRes = TSDB_CODE_SUCCESS;
while (winRes == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount >= recoverNum) { if (pFileState->curRowCount >= recoverNum) {
break; break;
} }
@ -1024,8 +1025,8 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
void* pVal = NULL; void* pVal = NULL;
int32_t vlen = 0; int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
if (code != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) { if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs); SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
taosMemoryFreeClear(pNode); taosMemoryFreeClear(pNode);
@ -1036,8 +1037,8 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
memcpy(pNewPos->pRowBuff, pVal, vlen); memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true; pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) { if (winRes != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
break; break;
} }
@ -1045,7 +1046,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,