From 3b9e03ba175eb250c8acd40794033be4e7385f69 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 26 Aug 2022 16:56:42 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 491 +++++------------------ 1 file changed, 93 insertions(+), 398 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index b440a61469..ce31acc331 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1000,10 +1000,7 @@ _err: static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { int32_t code = 0; - // .data - while (true) { - if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break; - + while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); if (code) goto _err; @@ -1349,6 +1346,11 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); } +static SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { + // TODO + return NULL; +} + static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) { int32_t code = 0; @@ -1437,6 +1439,66 @@ _exit: return code; } +static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { + int32_t code = 0; + SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; + + ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); + if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { + int32_t iBlock = 0; + SBlock block; + SBlock *pBlock = █ + SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); + + ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid); + + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + while (pBlock && pRowInfo) { + SBlock tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; + int32_t c = tBlockCmprFn(pBlock, &tBlock); + + if (c < 0) { + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + if (code) goto _err; + + iBlock++; + if (iBlock < pCommitter->dReader.mBlock.nItem) { + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + } else if (c > 0) { + } else { + } + } + + while (pBlock) { + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + if (code) goto _err; + + iBlock++; + if (iBlock < pCommitter->dReader.mBlock.nItem) { + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + } + } + +_exit: + return code; + +_err: + tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitTableData2(SCommitter *pCommitter, TABLEID id) { + int32_t code = 0; + // TODO + return code; +} + static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t code = 0; @@ -1447,42 +1509,45 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { if (code) goto _err; if (pRowInfo == NULL) { - // end the commit (todo) + /* end current table data commit (todo) */ + + /* end remain table data commit*/ + code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}); + if (code) goto _err; + + if (pCommitter->dWriter.bDatal.nRow > 0) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } + break; } if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) { - // table changed, end current table commit (todo) + /* end current table data commit (todo) */ - // prepare the new + /* start new table data commit */ id.suid = pRowInfo->suid; id.uid = pRowInfo->uid; - } - - SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; - if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { - while (true) { - /* code */ - } - } - - if (pRowInfo->row.type == 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, pRowInfo->suid, pRowInfo->uid, TSDBROW_SVERSION(&pRowInfo->row)); + // reader + code = tsdbMoveCommitData(pCommitter, id); + if (code) goto _err; + // writer + tMapDataReset(&pCommitter->dWriter.mBlock); + // other + code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); + if (code) goto _err; + code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmRow.pTSchema); if (code) goto _err; } - code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, pCommitter->skmRow.pTSchema, pRowInfo->uid); + /* merge with data in .data file */ + code = tsdbMergeTableData(pCommitter, id); if (code) goto _err; - if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) { - if (1 /*toLastOnly*/) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } else { - code = tsdbCommitDataBlock(pCommitter, NULL); - if (code) goto _err; - } - } + /* handle remain table data */ + code = tsdbCommitTableData2(pCommitter, id); + if (code) goto _err; } return code; @@ -1491,373 +1556,3 @@ _err: tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } - -// // ================================================================================ -// typedef struct { -// SRowInfo rowInfo; -// SDataFReader *pReader; -// int32_t iLast; -// SArray *aBlockL; // SArray -// int32_t iBlockL; -// SBlockData bData; -// int32_t iRow; -// } SLDataIter; - -// typedef struct { -// SRBTreeNode *pNode; -// SRBTree rbt; -// } SDataMerger; - -// static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { -// pMerger->pNode = NULL; -// tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); -// for (int32_t iNode = 0; iNode < taosArrayGetSize(aNodeP); iNode++) { -// SRBTreeNode *pNode = (SRBTreeNode *)taosArrayGetP(aNodeP, iNode); - -// pNode = tRBTreePut(&pMerger->rbt, pNode); -// ASSERT(pNode); -// } -// } - -// static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { -// int32_t code = 0; - -// if (pMerger->pNode) { -// // next current iter -// SLDataIter *pIter = (SLDataIter *)pMerger->pNode->payload; - -// pIter->iRow++; -// if (pIter->iRow < pIter->bData.nRow) { -// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; -// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); -// } else { -// pIter->iBlockL++; -// if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { -// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); -// code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData); -// if (code) goto _exit; - -// pIter->iRow = 0; -// pIter->rowInfo.suid = pIter->bData.suid; -// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; -// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); -// } else { -// pMerger->pNode = NULL; -// } -// } - -// SRBTreeNode *pMinNode = tRBTreeMin(&pMerger->rbt); -// if (pMerger->pNode && pMinNode) { -// int32_t c = tRowInfoCmprFn(pMerger->pNode->payload, pMinNode->payload); -// if (c > 0) { -// pMerger->pNode = tRBTreePut(&pMerger->rbt, pMerger->pNode); -// ASSERT(pMerger->pNode); -// pMerger->pNode = NULL; -// } else { -// ASSERT(c); -// } -// } -// } - -// if (pMerger->pNode == NULL) { -// pMerger->pNode = tRBTreeMin(&pMerger->rbt); -// if (pMerger->pNode) { -// tRBTreeDrop(&pMerger->rbt, pMerger->pNode); -// } -// } - -// if (pMerger->pNode) { -// *ppInfo = &((SLDataIter *)pMerger->pNode->payload)[0].rowInfo; -// } else { -// *ppInfo = NULL; -// } - -// _exit: -// return code; -// } - -// typedef struct { -// STsdb *pTsdb; -// int8_t maxLast; -// int32_t minRow; -// int32_t maxRow; -// int8_t cmprAlg; -// int64_t commitID; -// STsdbFS fs; -// struct { -// SDataFReader *pReader; -// SArray *aBlockIdx; -// SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE]; -// SDataMerger merger; -// } dReader; -// struct { -// SDataFWriter *pWriter; -// SArray *aBlockIdx; -// SArray *aBlockL; -// SBlockData bData; -// SBlockData bDatal; -// } dWriter; -// } STsdbMerger; - -// static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { -// int32_t code = 0; -// STsdb *pTsdb = pMerger->pTsdb; - -// // reader -// code = tsdbDataFReaderOpen(&pMerger->dReader.pReader, pTsdb, pSet); -// if (code) goto _err; - -// code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx); -// if (code) goto _err; - -// pMerger->dReader.merger.pNode = NULL; -// tRBTreeCreate(&pMerger->dReader.merger.rbt, tRowInfoCmprFn); -// for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { -// SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter)); -// if (pNode == NULL) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _err; -// } - -// SLDataIter *pIter = (SLDataIter *)pNode->payload; -// pIter->pReader = pMerger->dReader.pReader; -// pIter->iLast = iLast; - -// pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); -// if (pIter->aBlockL == NULL) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _err; -// } -// code = tBlockDataCreate(&pIter->bData); -// if (code) goto _err; - -// code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL); -// if (code) goto _err; - -// if (taosArrayGetSize(pIter->aBlockL) == 0) continue; -// pIter->iBlockL = 0; - -// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); -// code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData); -// if (code) goto _err; - -// pIter->iRow = 0; -// pIter->rowInfo.suid = pIter->bData.suid; -// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; -// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); - -// pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); -// ASSERT(pNode); - -// pMerger->dReader.aLDataiter[iLast] = pIter; -// } - -// // writer -// SHeadFile fHead = {.commitID = pMerger->commitID}; -// SDataFile fData = *pSet->pDataF; -// SSmaFile fSma = *pSet->pSmaF; -// SLastFile fLast = {.commitID = pMerger->commitID}; -// SDFileSet wSet = {.diskId = pSet->diskId, -// .fid = pSet->fid, -// .nLastF = 1, -// .pHeadF = &fHead, -// .pDataF = &fData, -// .pSmaF = &fSma, -// .aLastF[0] = &fLast}; -// code = tsdbDataFWriterOpen(&pMerger->dWriter.pWriter, pTsdb, &wSet); -// if (code) goto _err; - -// return code; - -// _err: -// tsdbError("vgId:%d tsdb merge file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbMergeFileDataEnd(STsdbMerger *pMerger) { -// int32_t code = 0; -// STsdb *pTsdb = pMerger->pTsdb; - -// // write aBlockIdx -// code = tsdbWriteBlockIdx(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockIdx); -// if (code) goto _err; - -// // write aBlockL -// code = tsdbWriteBlockL(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockL); -// if (code) goto _err; - -// // update file header -// code = tsdbUpdateDFileSetHeader(pMerger->dWriter.pWriter); -// if (code) goto _err; - -// // upsert SDFileSet -// code = tsdbFSUpsertFSet(&pMerger->fs, &pMerger->dWriter.pWriter->wSet); -// if (code) goto _err; - -// // close and sync -// code = tsdbDataFWriterClose(&pMerger->dWriter.pWriter, 1); -// if (code) goto _err; - -// if (pMerger->dReader.pReader) { -// code = tsdbDataFReaderClose(&pMerger->dReader.pReader); -// if (code) goto _err; -// } - -// return code; - -// _err: -// tsdbError("vgId:%d tsdb merge file data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { -// int32_t code = 0; -// STsdb *pTsdb = pMerger->pTsdb; - -// // start -// code = tsdbMergeFileDataStart(pMerger, pSet); -// if (code) goto _err; - -// // impl -// SRowInfo *pInfo; -// TABLEID id = {0}; -// while (true) { -// code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); -// if (code) goto _err; - -// if (pInfo == NULL) { -// if (pMerger->dWriter.bData.nRow > 0) { -// // TODO -// } - -// if (pMerger->dWriter.bDatal.nRow > 0) { -// // TODO -// } - -// break; -// } - -// if (id.suid != pInfo->suid || id.uid != pInfo->uid) { -// while (true) { -// // move commit the head data -// } - -// // prepare to commit next -// } - -// code = tBlockDataAppendRow(&pMerger->dWriter.bData, &pInfo->row, NULL, pInfo->uid); -// if (code) goto _err; - -// if (pMerger->dWriter.bData.nRow >= pMerger->maxRow * 4 / 5) { -// // code = tsdbCommitDataBlock(); -// if (code) goto _err; -// } -// } - -// // end -// code = tsdbMergeFileDataEnd(pMerger); -// if (code) goto _err; - -// return code; - -// _err: -// tsdbError("vgId:%d tsdb merge file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) { -// int32_t code = 0; - -// pMerger->pTsdb = pTsdb; -// pMerger->maxLast = TSDB_DEFAULT_LAST_FILE; -// pMerger->commitID = ++pTsdb->pVnode->state.commitID; -// code = tsdbFSCopy(pTsdb, &pMerger->fs); -// if (code) goto _exit; - -// // reader -// pMerger->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); -// if (pMerger->dReader.aBlockIdx == NULL) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _exit; -// } -// // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { -// // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); -// // if (pMerger->dReader.aBlockL[iLast] == NULL) { -// // code = TSDB_CODE_OUT_OF_MEMORY; -// // goto _exit; -// // } -// // } - -// // writer -// pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); -// if (pMerger->dWriter.aBlockIdx == NULL) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _exit; -// } -// pMerger->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); -// if (pMerger->dWriter.aBlockL == NULL) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _exit; -// } - -// _exit: -// return code; -// } - -// static int32_t tsdbEndMerge(STsdbMerger *pMerger) { -// int32_t code = 0; -// STsdb *pTsdb = pMerger->pTsdb; - -// code = tsdbFSCommit1(pTsdb, &pMerger->fs); -// if (code) goto _err; - -// taosThreadRwlockWrlock(&pTsdb->rwLock); -// code = tsdbFSCommit2(pTsdb, &pMerger->fs); -// if (code) { -// taosThreadRwlockUnlock(&pTsdb->rwLock); -// goto _err; -// } -// taosThreadRwlockUnlock(&pTsdb->rwLock); - -// // writer -// taosArrayDestroy(pMerger->dWriter.aBlockL); -// taosArrayDestroy(pMerger->dWriter.aBlockIdx); - -// // reader -// // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { -// // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); -// // } -// taosArrayDestroy(pMerger->dReader.aBlockIdx); -// tsdbFSDestroy(&pMerger->fs); - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb end merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// int32_t tsdbMerge(STsdb *pTsdb) { -// int32_t code = 0; -// STsdbMerger merger = {0}; - -// code = tsdbStartMerge(&merger, pTsdb); -// if (code) goto _err; - -// for (int32_t iSet = 0; iSet < taosArrayGetSize(merger.fs.aDFileSet); iSet++) { -// SDFileSet *pSet = (SDFileSet *)taosArrayGet(merger.fs.aDFileSet, iSet); -// if (pSet->nLastF < merger.maxLast) continue; - -// code = tsdbMergeFileData(&merger, pSet); -// if (code) goto _err; -// } - -// code = tsdbEndMerge(&merger); -// if (code) goto _err; - -// return code; - -// _err: -// tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); -// return code; -// } \ No newline at end of file