From d31459dc08a7bcdac8bc4fcb636567a0168edb1f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 27 Aug 2022 17:11:00 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 120 +++++++++++------------ 1 file changed, 55 insertions(+), 65 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index daee624f47..59e5d36064 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -78,7 +78,7 @@ typedef struct { SRBTree rbt; SDataIter dataIter; SDataIter aDataIter[TSDB_MAX_LAST_FILE]; - int8_t toLast; + int8_t toLastOnly; }; struct { SDataFWriter *pWriter; @@ -403,30 +403,31 @@ _exit: static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { int32_t code = 0; - tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); pCommitter->pIter = NULL; + tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); // memory + TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; SDataIter *pIter = &pCommitter->dataIter; pIter->type = MEMORY_DATA_ITER; pIter->iTbDataP = 0; for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) { STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); - TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter); TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - continue; + pRow = NULL; } + if (pRow == NULL) continue; + pIter->r.suid = pTbData->suid; pIter->r.uid = pTbData->uid; pIter->r.row = *pRow; break; } - + ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)); tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); // disk @@ -448,17 +449,22 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, iLast, pBlockL, &pIter->bData); if (code) goto _err; + pIter->iRow = 0; pIter->r.suid = pIter->bData.suid; - pIter->r.uid = pIter->bData.uid; + pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); iIter++; } - pCommitter->toLast = 0; + if (iIter > 0) { + pCommitter->toLastOnly = 1; + } else { + pCommitter->toLastOnly = 0; + } } else { - pCommitter->toLast = 1; + pCommitter->toLastOnly = 0; } code = tsdbNextCommitRow(pCommitter); @@ -482,8 +488,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->nextKey = TSKEY_MAX; // Reader - pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid}, - tDFileSetCmprFn, TD_EQ); + SDFileSet tDFileSet = {.fid = pCommitter->commitFid}; + pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ); if (pRSet) { code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); if (code) goto _err; @@ -493,10 +499,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { if (code) goto _err; pCommitter->dReader.iBlockIdx = 0; - if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { - pCommitter->dReader.pBlockIdx = - (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - + if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { + pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _err; } else { @@ -508,47 +512,32 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } // Writer - SHeadFile fHead; - SDataFile fData; - SSmaFile fSma; - SLastFile fLast; - SDFileSet wSet = {0}; + SHeadFile fHead = {.commitID = pCommitter->commitID}; + SDataFile fData = {.commitID = pCommitter->commitID}; + SSmaFile fSma = {.commitID = pCommitter->commitID}; + SLastFile fLast = {.commitID = pCommitter->commitID}; + SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; if (pRSet) { - ASSERT(pCommitter->maxLast == 1 || pRSet->nLastF < pCommitter->maxLast); - - fHead = (SHeadFile){.commitID = pCommitter->commitID}; + ASSERT(pRSet->nLastF <= pCommitter->maxLast); fData = *pRSet->pDataF; fSma = *pRSet->pSmaF; - fLast = (SLastFile){.commitID = pCommitter->commitID}; - wSet.diskId = pRSet->diskId; - wSet.fid = pCommitter->commitFid; - wSet.pHeadF = &fHead; - wSet.pDataF = &fData; - wSet.pSmaF = &fSma; - for (int8_t iLast = 0; iLast < pRSet->nLastF; iLast++) { - wSet.aLastF[iLast] = pRSet->aLastF[iLast]; + if (pRSet->nLastF < pCommitter->maxLast) { + for (int32_t iLast = 0; iLast < pRSet->nLastF; iLast++) { + wSet.aLastF[iLast] = pRSet->aLastF[iLast]; + } + wSet.nLastF = pRSet->nLastF + 1; + } else { + wSet.nLastF = 1; } - wSet.nLastF = pRSet->nLastF + 1; - wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo } else { - fHead = (SHeadFile){.commitID = pCommitter->commitID}; - fData = (SDataFile){.commitID = pCommitter->commitID}; - fSma = (SSmaFile){.commitID = pCommitter->commitID}; - fLast = (SLastFile){.commitID = pCommitter->commitID}; - SDiskID did = {0}; tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); - wSet.diskId = did; - wSet.fid = pCommitter->commitFid; - wSet.pHeadF = &fHead; - wSet.pDataF = &fData; - wSet.pSmaF = &fSma; wSet.nLastF = 1; - wSet.aLastF[0] = &fLast; } + wSet.aLastF[wSet.nLastF - 1] = &fLast; code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); if (code) goto _err; @@ -1722,6 +1711,9 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { pBlock = NULL; } } + + code = tsdbCommitterNextTableData(pCommitter); + if (code) goto _err; } _exit: @@ -1791,34 +1783,20 @@ _err: static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t code = 0; - SRowInfo *pRowInfo = NULL; + SRowInfo *pRowInfo; TABLEID id = {0}; - while (true) { - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo == NULL) { - /* end remain table data commit*/ - code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}); - if (code) goto _err; - - if (pCommitter->dWriter.bDatal.nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } - - break; - } - + while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) { ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid); - - /* start new table data commit */ id.suid = pRowInfo->suid; id.uid = pRowInfo->uid; - // reader + code = tsdbMoveCommitData(pCommitter, id); if (code) goto _err; - // writer + + // start tMapDataReset(&pCommitter->dWriter.mBlock); - // other + + // impl code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); if (code) goto _err; code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); @@ -1834,6 +1812,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbCommitTableData(pCommitter, id); if (code) goto _err; + // end if (pCommitter->dWriter.mBlock.nItem > 0) { SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); @@ -1846,6 +1825,17 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } } + id.suid = INT64_MAX; + id.uid = INT64_MAX; + code = tsdbMoveCommitData(pCommitter, id); + if (code) goto _err; + + // TODO: here may have problem + if (pCommitter->dWriter.bDatal.nRow > 0) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } + return code; _err: