adj flush state rule

This commit is contained in:
54liuyao 2024-07-12 13:41:22 +08:00
parent 6c84d926ab
commit 659e873602
1 changed files with 16 additions and 11 deletions

View File

@ -303,7 +303,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
} }
} }
void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) { void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
uint64_t i = 0; uint64_t i = 0;
SListIter iter = {0}; SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
@ -311,14 +311,19 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi
SListNode* pNode = NULL; SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && i < max) { while ((pNode = tdListNext(&iter)) != NULL && i < max) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) { if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
tdListAppend(pFlushList, &pPos); if (all || !pPos->beUsed) {
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); if (all && !pPos->pRowBuff) {
pFileState->stateBuffRemoveByPosFn(pFileState, pPos); continue;
tdListPopNode(pFileState->usedBuffs, pNode); }
taosMemoryFreeClear(pNode); tdListAppend(pFlushList, &pPos);
if (pPos->pRowBuff) { pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
i++; pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
if (pPos->pRowBuff) {
i++;
}
} }
} }
} }
@ -370,7 +375,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
num = TMAX(num, FLUSH_NUM); num = TMAX(num, FLUSH_NUM);
clearFlushedRowBuff(pFileState, pFlushList, num); clearFlushedRowBuff(pFileState, pFlushList, num, false);
if (isListEmpty(pFlushList)) { if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, false); popUsedBuffs(pFileState, pFlushList, num, false);
@ -380,7 +385,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
} }
if (pFileState->searchBuff) { if (pFileState->searchBuff) {
clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount); clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
} }