From d862aa0f3baab87617040553e6695473a46a6f1b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 28 Jun 2022 02:35:37 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 41 ++++++++++++++++++------ 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index fb80d1487c..65dc2203c2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -636,7 +636,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB // loop to merge pRow1 = tsdbTbDataIterGet(pIter); *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); - ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); + ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; @@ -668,7 +668,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB pRow1 = NULL; } } - } else if (tsdbRowCmprFn(pRow1, pRow2) < 0) { + } else if (tsdbRowCmprFn(pRow1, pRow2) > 0) { *pRow = *pRow2; if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { @@ -702,7 +702,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB } } - code = tBlockDataAppendRow(pBlockData, &row, pCommitter->pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (code) goto _err; pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); @@ -754,8 +754,9 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tBlockReset(pBlock); tBlockDataReset(pBlockData); pRow = tsdbTbDataIterGet(pIter); + ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0); while (true) { - if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) { + if (pRow == NULL) { if (pBlockData->nRow > 0) { goto _write_block; } else { @@ -783,6 +784,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); + if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL; if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; continue; @@ -983,7 +985,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl int32_t code = 0; STbDataIter *pIter = &(STbDataIter){0}; TSDBROW *pRow; - int32_t iBlock = 0; + int32_t iBlock; int32_t nBlock; int64_t suid; int64_t uid; @@ -991,6 +993,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pTbData) { tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; + suid = pTbData->suid; uid = pTbData->uid; } else { @@ -1004,18 +1008,20 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl nBlock = pCommitter->oBlockMap.nItem; ASSERT(nBlock > 0); + suid = pBlockIdx->suid; uid = pBlockIdx->uid; } else { nBlock = 0; } - if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && nBlock == 0) goto _exit; + if (pRow == NULL && nBlock == 0) goto _exit; // start =========== tMapDataReset(&pCommitter->nBlockMap); SBlock *pBlock = &pCommitter->oBlock; + iBlock = 0; if (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); } else { @@ -1024,24 +1030,28 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // merge =========== while (true) { - if (((pRow == NULL) || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; + if (pRow == NULL && pBlock == NULL) break; - if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) { + if (pRow && pBlock) { if (pBlock->last) { code = tsdbMergeTableData(pCommitter, pIter, pBlock, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; iBlock++; if (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } + + ASSERT(pRow == NULL && pBlock == NULL); } else { int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock); if (c > 0) { + // only disk data code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); if (code) goto _err; @@ -1052,11 +1062,14 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl pBlock = NULL; } } else if (c < 0) { + // only memory data code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; } else { + // merge memory and disk int64_t nOvlp = 0; // (todo) if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { // add as a subblock @@ -1071,6 +1084,15 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (code) goto _err; } } + + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; + iBlock++; + if (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } } } } else if (pBlock) { @@ -1089,7 +1111,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); - ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; + ASSERT(pRow == NULL); } }