From 659e8736022b17f8808e80a7be3fab574535bd19 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 12 Jul 2024 13:41:22 +0800 Subject: [PATCH] adj flush state rule --- source/libs/stream/src/tstreamFileState.c | 27 ++++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 5c022c2a5b..ea9857ffef 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -303,7 +303,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { } } -void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) { +void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) { uint64_t i = 0; SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); @@ -311,14 +311,19 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL && i < max) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) { - tdListAppend(pFlushList, &pPos); - pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); - pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(pNode); - if (pPos->pRowBuff) { - i++; + if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) { + if (all || !pPos->beUsed) { + if (all && !pPos->pRowBuff) { + continue; + } + tdListAppend(pFlushList, &pPos); + pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); + pFileState->stateBuffRemoveByPosFn(pFileState, pPos); + tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(pNode); + if (pPos->pRowBuff) { + i++; + } } } } @@ -370,7 +375,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); num = TMAX(num, FLUSH_NUM); - clearFlushedRowBuff(pFileState, pFlushList, num); + clearFlushedRowBuff(pFileState, pFlushList, num, false); if (isListEmpty(pFlushList)) { popUsedBuffs(pFileState, pFlushList, num, false); @@ -380,7 +385,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { } if (pFileState->searchBuff) { - clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount); + clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true); }