From c9ab455befa96e91379814cfdd9622f75bd8ec5a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 27 Jun 2022 13:05:17 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 117 ++++++++++++++++------- 1 file changed, 80 insertions(+), 37 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 4e4ac72f93..855b43dea7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -614,8 +614,8 @@ _err: return code; } -static int32_t tsdbMergeBlockAndMem(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock, - TSDBKEY toKey /*not included*/, int8_t toDataOnly) { +static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock, + TSDBKEY toKey /*not included*/, int8_t toDataOnly) { int32_t code = 0; SBlockData *pBlockDataFrom = &pCommitter->oBlockData; SBlockData *pBlockDataTo = &pCommitter->nBlockData; @@ -692,7 +692,7 @@ _err: return code; } -static int32_t tsdbCommitMemoryDataImpl(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { +static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { int32_t code = 0; TSDBROW *pRow; SBlockData *pBlockData = &pCommitter->nBlockData; @@ -729,16 +729,22 @@ _err: return code; } -static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock) { +static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) { int32_t code = 0; if (pBlock->last) { - // TODO - code = tsdbReadBlockData(pCommitter->pReader, NULL, pBlock, &pCommitter->oBlockData, NULL, NULL); + code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL); if (code) goto _err; - code = - tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, NULL, NULL, pCommitter->cmprAlg); + tBlockReset(&pCommitter->nBlock); + pCommitter->nBlock.minKey = pBlock->minKey; + pCommitter->nBlock.maxKey = pBlock->maxKey; + pCommitter->nBlock.minVersion = pBlock->minVersion; + pCommitter->nBlock.nRow = pBlock->nRow; + pCommitter->nBlock.last = pBlock->last; + pCommitter->nBlock.hasDup = pBlock->hasDup; + code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock, + pCommitter->cmprAlg); if (code) goto _err; code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock); @@ -792,8 +798,8 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) { if (pBlock->last) { // merge memory data and disk data to write to .data/.last (todo) - code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -803,13 +809,13 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock if (c < 0) { // commit memory data until pBlock->minKey (not included) only to .data file (todo) - code = tsdbCommitMemoryDataImpl(pCommitter, pIter, pBlock->minKey, 1); + code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); } else if (c > 0) { // just move the block (todo) - code = tsdbCommitTableDiskData(pCommitter, pBlock); + // code = tsdbCommitTableDiskData(pCommitter, pBlock); if (code) goto _err; iBlock++; @@ -821,12 +827,12 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock } else { if (iBlock == nBlock - 1) { // merge memory data and disk data to .data/.last file - code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; } else { // merge memory data and disk data to .data file only until pBlock[1]. - code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1); + code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1); } } @@ -835,15 +841,15 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock } } } else if (pBlock) { - code = tsdbCommitTableDiskData(pCommitter, pBlock); + // code = tsdbCommitTableDiskData(pCommitter, pBlock); if (code) goto _err; iBlock++; // next block } else { // commit only memory data until (pCommitter->maxKey, VERSION_MAX) - code = tsdbCommitMemoryDataImpl(pCommitter, pIter, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + code = + tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -851,8 +857,8 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock } // end ===================== - // // SBlock - // code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx); + // SBlock + // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); // if (code) goto _err; // // SBlockIdx @@ -871,16 +877,37 @@ _err: return code; } +static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { + int32_t code = 0; + SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = suid, .uid = uid}; + + code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { int32_t code = 0; STbDataIter *pIter = &(STbDataIter){0}; TSDBROW *pRow; int32_t iBlock = 0; int32_t nBlock; + int64_t suid; + int64_t uid; if (pTbData) { tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); pRow = tsdbTbDataIterGet(pIter); + suid = pTbData->suid; + uid = pTbData->uid; } else { pIter = NULL; pRow = NULL; @@ -892,6 +919,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl nBlock = pCommitter->oBlockMap.nItem; ASSERT(nBlock > 0); + suid = pBlockIdx->suid; + uid = pBlockIdx->uid; } else { nBlock = 0; } @@ -900,17 +929,23 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // start =========== tMapDataReset(&pCommitter->nBlockMap); - SBlock *pBlock = NULL; // (todo) + SBlock *pBlock = &pCommitter->oBlock; int32_t c; + if (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + // merge =========== while (true) { if (((pRow == NULL) || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) { if (pBlock->last) { - code = tsdbMergeBlockAndMem(pCommitter, pIter, pBlockIdx, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -918,12 +953,18 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl } else { c = tBlockCmprFn(&(SBlock){}, pBlock); if (c > 0) { - code = tsdbCommitTableDiskData(pCommitter, pBlock); + code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); if (code) goto _err; iBlock++; + iBlock++; + if (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } } else if (c < 0) { - code = tsdbCommitMemoryDataImpl(pCommitter, pIter, pBlock->minKey, 1); + code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -933,25 +974,30 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // add as a subblock } else { if (iBlock == nBlock - 1) { - code = tsdbMergeBlockAndMem(pCommitter, pIter, pBlockIdx, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; } else { - // code = tsdbMergeBlockAndMem(pCommitter, pIter, pBlockIdx, pBlock, pBlock[1].minKey, 1); + // code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, pBlock[1].minKey, 1); if (code) goto _err; } } } } } else if (pBlock) { - code = tsdbCommitTableDiskData(pCommitter, pBlock); + code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); if (code) goto _err; - // move to next block (todo) + iBlock++; + if (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } } else { - code = tsdbCommitMemoryDataImpl(pCommitter, pIter, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + code = + tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -959,11 +1005,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl } } - // end - // code = tsdbWriteBlock(); - if (code) goto _err; - - code = tMapDataPutItem(&pCommitter->nBlockIdxMap, NULL, tPutBlockIdx); + // end ===================== + code = tsdbCommitTableDataEnd(pCommitter, suid, uid); if (code) goto _err; _exit: