From 9c4d34ede15d5b1051a0faa12dd5c5fbf476645a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 5 Sep 2024 09:27:36 +0800 Subject: [PATCH] fix file state issue --- source/libs/stream/src/tstreamFileState.c | 45 +++++++++++++++-------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 529ff78cae..1fe7bd46eb 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -655,18 +655,9 @@ _end: return code; } -int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) { +static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - if (pPos->pRowBuff) { - if (pPos->needFree) { - code = recoverSessionRowBuff(pFileState, pPos); - QUERY_CHECK_CODE(code, lino, _end); - } - (*pVal) = pPos->pRowBuff; - goto _end; - } - pPos->pRowBuff = getFreeBuff(pFileState); if (!pPos->pRowBuff) { if (pFileState->curRowCount < pFileState->maxRowCount) { @@ -687,6 +678,27 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** code = recoverSessionRowBuff(pFileState, pPos); QUERY_CHECK_CODE(code, lino, _end); +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + if (pPos->pRowBuff) { + if (pPos->needFree) { + code = recoverSessionRowBuff(pFileState, pPos); + QUERY_CHECK_CODE(code, lino, _end); + } + (*pVal) = pPos->pRowBuff; + goto _end; + } + + recoverStateRowBuff(pFileState, pPos); + (*pVal) = pPos->pRowBuff; if (!pPos->needFree) { code = tdListPrepend(pFileState->usedBuffs, &pPos); @@ -1026,13 +1038,16 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi int32_t lino = 0; (*pWinCode) = TSDB_CODE_FAILED; pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey)); - SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen); - if (pos) { + SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen); + if (ppPos) { *pVLen = pFileState->rowSize; - *pVal = *pos; - (*pos)->beUsed = true; - (*pos)->beFlushed = false; + *pVal = *ppPos; + (*ppPos)->beUsed = true; + (*ppPos)->beFlushed = false; (*pWinCode) = TSDB_CODE_SUCCESS; + if ((*ppPos)->pRowBuff == NULL) { + recoverStateRowBuff(pFileState, *ppPos); + } goto _end; } TSKEY ts = pFileState->getTs(pKey);