From 90b32809c5cebc7810223d885228bf6f02b27678 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 26 Aug 2022 15:48:44 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 753 ++++++++++++----------- 1 file changed, 380 insertions(+), 373 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 4b43871096..b440a61469 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1024,6 +1024,7 @@ _err: return code; } +static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); static int32_t tsdbCommitFileData(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -1033,6 +1034,11 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { code = tsdbCommitFileDataStart(pCommitter); if (code) goto _err; +#if 1 + // impl + code = tsdbCommitFileDataImpl(pCommitter); + if (code) goto _err; +#else // commit file data impl for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); @@ -1059,6 +1065,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { code = tsdbCommitLastBlock(pCommitter); if (code) goto _err; } +#endif // commit file data end code = tsdbCommitFileDataEnd(pCommitter); @@ -1309,10 +1316,10 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); - if (pCommitter->toMerge) { - code = tsdbMerge(pTsdb); - if (code) goto _err; - } + // if (pCommitter->toMerge) { + // code = tsdbMerge(pTsdb); + // if (code) goto _err; + // } tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); return code; @@ -1485,372 +1492,372 @@ _err: return code; } -// ================================================================================ -typedef struct { - SRowInfo rowInfo; - SDataFReader *pReader; - int32_t iLast; - SArray *aBlockL; // SArray - int32_t iBlockL; - SBlockData bData; - int32_t iRow; -} SLDataIter; - -typedef struct { - SRBTreeNode *pNode; - SRBTree rbt; -} SDataMerger; - -static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { - pMerger->pNode = NULL; - tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); - for (int32_t iNode = 0; iNode < taosArrayGetSize(aNodeP); iNode++) { - SRBTreeNode *pNode = (SRBTreeNode *)taosArrayGetP(aNodeP, iNode); - - pNode = tRBTreePut(&pMerger->rbt, pNode); - ASSERT(pNode); - } -} - -static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { - int32_t code = 0; - - if (pMerger->pNode) { - // next current iter - SLDataIter *pIter = (SLDataIter *)pMerger->pNode->payload; - - pIter->iRow++; - if (pIter->iRow < pIter->bData.nRow) { - pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - } else { - pIter->iBlockL++; - if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); - code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData); - if (code) goto _exit; - - pIter->iRow = 0; - pIter->rowInfo.suid = pIter->bData.suid; - pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); - } else { - pMerger->pNode = NULL; - } - } - - SRBTreeNode *pMinNode = tRBTreeMin(&pMerger->rbt); - if (pMerger->pNode && pMinNode) { - int32_t c = tRowInfoCmprFn(pMerger->pNode->payload, pMinNode->payload); - if (c > 0) { - pMerger->pNode = tRBTreePut(&pMerger->rbt, pMerger->pNode); - ASSERT(pMerger->pNode); - pMerger->pNode = NULL; - } else { - ASSERT(c); - } - } - } - - if (pMerger->pNode == NULL) { - pMerger->pNode = tRBTreeMin(&pMerger->rbt); - if (pMerger->pNode) { - tRBTreeDrop(&pMerger->rbt, pMerger->pNode); - } - } - - if (pMerger->pNode) { - *ppInfo = &((SLDataIter *)pMerger->pNode->payload)[0].rowInfo; - } else { - *ppInfo = NULL; - } - -_exit: - return code; -} - -typedef struct { - STsdb *pTsdb; - int8_t maxLast; - int32_t minRow; - int32_t maxRow; - int8_t cmprAlg; - int64_t commitID; - STsdbFS fs; - struct { - SDataFReader *pReader; - SArray *aBlockIdx; - SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE]; - SDataMerger merger; - } dReader; - struct { - SDataFWriter *pWriter; - SArray *aBlockIdx; - SArray *aBlockL; - SBlockData bData; - SBlockData bDatal; - } dWriter; -} STsdbMerger; - -static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - // reader - code = tsdbDataFReaderOpen(&pMerger->dReader.pReader, pTsdb, pSet); - if (code) goto _err; - - code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx); - if (code) goto _err; - - pMerger->dReader.merger.pNode = NULL; - tRBTreeCreate(&pMerger->dReader.merger.rbt, tRowInfoCmprFn); - for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { - SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter)); - if (pNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - SLDataIter *pIter = (SLDataIter *)pNode->payload; - pIter->pReader = pMerger->dReader.pReader; - pIter->iLast = iLast; - - pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pIter->aBlockL == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - code = tBlockDataCreate(&pIter->bData); - if (code) goto _err; - - code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL); - if (code) goto _err; - - if (taosArrayGetSize(pIter->aBlockL) == 0) continue; - pIter->iBlockL = 0; - - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); - code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData); - if (code) goto _err; - - pIter->iRow = 0; - pIter->rowInfo.suid = pIter->bData.suid; - pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); - - pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); - ASSERT(pNode); - - pMerger->dReader.aLDataiter[iLast] = pIter; - } - - // writer - SHeadFile fHead = {.commitID = pMerger->commitID}; - SDataFile fData = *pSet->pDataF; - SSmaFile fSma = *pSet->pSmaF; - SLastFile fLast = {.commitID = pMerger->commitID}; - SDFileSet wSet = {.diskId = pSet->diskId, - .fid = pSet->fid, - .nLastF = 1, - .pHeadF = &fHead, - .pDataF = &fData, - .pSmaF = &fSma, - .aLastF[0] = &fLast}; - code = tsdbDataFWriterOpen(&pMerger->dWriter.pWriter, pTsdb, &wSet); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb merge file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbMergeFileDataEnd(STsdbMerger *pMerger) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - // write aBlockIdx - code = tsdbWriteBlockIdx(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockIdx); - if (code) goto _err; - - // write aBlockL - code = tsdbWriteBlockL(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockL); - if (code) goto _err; - - // update file header - code = tsdbUpdateDFileSetHeader(pMerger->dWriter.pWriter); - if (code) goto _err; - - // upsert SDFileSet - code = tsdbFSUpsertFSet(&pMerger->fs, &pMerger->dWriter.pWriter->wSet); - if (code) goto _err; - - // close and sync - code = tsdbDataFWriterClose(&pMerger->dWriter.pWriter, 1); - if (code) goto _err; - - if (pMerger->dReader.pReader) { - code = tsdbDataFReaderClose(&pMerger->dReader.pReader); - if (code) goto _err; - } - - return code; - -_err: - tsdbError("vgId:%d tsdb merge file data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - // start - code = tsdbMergeFileDataStart(pMerger, pSet); - if (code) goto _err; - - // impl - SRowInfo *pInfo; - TABLEID id = {0}; - while (true) { - code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); - if (code) goto _err; - - if (pInfo == NULL) { - if (pMerger->dWriter.bData.nRow > 0) { - // TODO - } - - if (pMerger->dWriter.bDatal.nRow > 0) { - // TODO - } - - break; - } - - if (id.suid != pInfo->suid || id.uid != pInfo->uid) { - while (true) { - // move commit the head data - } - - // prepare to commit next - } - - code = tBlockDataAppendRow(&pMerger->dWriter.bData, &pInfo->row, NULL, pInfo->uid); - if (code) goto _err; - - if (pMerger->dWriter.bData.nRow >= pMerger->maxRow * 4 / 5) { - // code = tsdbCommitDataBlock(); - if (code) goto _err; - } - } - - // end - code = tsdbMergeFileDataEnd(pMerger); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb merge file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) { - int32_t code = 0; - - pMerger->pTsdb = pTsdb; - pMerger->maxLast = TSDB_DEFAULT_LAST_FILE; - pMerger->commitID = ++pTsdb->pVnode->state.commitID; - code = tsdbFSCopy(pTsdb, &pMerger->fs); - if (code) goto _exit; - - // reader - pMerger->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pMerger->dReader.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); - // if (pMerger->dReader.aBlockL[iLast] == NULL) { - // code = TSDB_CODE_OUT_OF_MEMORY; - // goto _exit; - // } - // } - - // writer - pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pMerger->dWriter.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pMerger->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pMerger->dWriter.aBlockL == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - -_exit: - return code; -} - -static int32_t tsdbEndMerge(STsdbMerger *pMerger) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - code = tsdbFSCommit1(pTsdb, &pMerger->fs); - if (code) goto _err; - - taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pTsdb, &pMerger->fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; - } - taosThreadRwlockUnlock(&pTsdb->rwLock); - - // writer - taosArrayDestroy(pMerger->dWriter.aBlockL); - taosArrayDestroy(pMerger->dWriter.aBlockIdx); - - // reader - // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); - // } - taosArrayDestroy(pMerger->dReader.aBlockIdx); - tsdbFSDestroy(&pMerger->fs); - - return code; - -_err: - tsdbError("vgId:%d, tsdb end merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbMerge(STsdb *pTsdb) { - int32_t code = 0; - STsdbMerger merger = {0}; - - code = tsdbStartMerge(&merger, pTsdb); - if (code) goto _err; - - for (int32_t iSet = 0; iSet < taosArrayGetSize(merger.fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(merger.fs.aDFileSet, iSet); - if (pSet->nLastF < merger.maxLast) continue; - - code = tsdbMergeFileData(&merger, pSet); - if (code) goto _err; - } - - code = tsdbEndMerge(&merger); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} \ No newline at end of file +// // ================================================================================ +// typedef struct { +// SRowInfo rowInfo; +// SDataFReader *pReader; +// int32_t iLast; +// SArray *aBlockL; // SArray +// int32_t iBlockL; +// SBlockData bData; +// int32_t iRow; +// } SLDataIter; + +// typedef struct { +// SRBTreeNode *pNode; +// SRBTree rbt; +// } SDataMerger; + +// static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { +// pMerger->pNode = NULL; +// tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); +// for (int32_t iNode = 0; iNode < taosArrayGetSize(aNodeP); iNode++) { +// SRBTreeNode *pNode = (SRBTreeNode *)taosArrayGetP(aNodeP, iNode); + +// pNode = tRBTreePut(&pMerger->rbt, pNode); +// ASSERT(pNode); +// } +// } + +// static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { +// int32_t code = 0; + +// if (pMerger->pNode) { +// // next current iter +// SLDataIter *pIter = (SLDataIter *)pMerger->pNode->payload; + +// pIter->iRow++; +// if (pIter->iRow < pIter->bData.nRow) { +// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; +// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); +// } else { +// pIter->iBlockL++; +// if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { +// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); +// code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData); +// if (code) goto _exit; + +// pIter->iRow = 0; +// pIter->rowInfo.suid = pIter->bData.suid; +// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; +// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); +// } else { +// pMerger->pNode = NULL; +// } +// } + +// SRBTreeNode *pMinNode = tRBTreeMin(&pMerger->rbt); +// if (pMerger->pNode && pMinNode) { +// int32_t c = tRowInfoCmprFn(pMerger->pNode->payload, pMinNode->payload); +// if (c > 0) { +// pMerger->pNode = tRBTreePut(&pMerger->rbt, pMerger->pNode); +// ASSERT(pMerger->pNode); +// pMerger->pNode = NULL; +// } else { +// ASSERT(c); +// } +// } +// } + +// if (pMerger->pNode == NULL) { +// pMerger->pNode = tRBTreeMin(&pMerger->rbt); +// if (pMerger->pNode) { +// tRBTreeDrop(&pMerger->rbt, pMerger->pNode); +// } +// } + +// if (pMerger->pNode) { +// *ppInfo = &((SLDataIter *)pMerger->pNode->payload)[0].rowInfo; +// } else { +// *ppInfo = NULL; +// } + +// _exit: +// return code; +// } + +// typedef struct { +// STsdb *pTsdb; +// int8_t maxLast; +// int32_t minRow; +// int32_t maxRow; +// int8_t cmprAlg; +// int64_t commitID; +// STsdbFS fs; +// struct { +// SDataFReader *pReader; +// SArray *aBlockIdx; +// SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE]; +// SDataMerger merger; +// } dReader; +// struct { +// SDataFWriter *pWriter; +// SArray *aBlockIdx; +// SArray *aBlockL; +// SBlockData bData; +// SBlockData bDatal; +// } dWriter; +// } STsdbMerger; + +// static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { +// int32_t code = 0; +// STsdb *pTsdb = pMerger->pTsdb; + +// // reader +// code = tsdbDataFReaderOpen(&pMerger->dReader.pReader, pTsdb, pSet); +// if (code) goto _err; + +// code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx); +// if (code) goto _err; + +// pMerger->dReader.merger.pNode = NULL; +// tRBTreeCreate(&pMerger->dReader.merger.rbt, tRowInfoCmprFn); +// for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { +// SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter)); +// if (pNode == NULL) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// goto _err; +// } + +// SLDataIter *pIter = (SLDataIter *)pNode->payload; +// pIter->pReader = pMerger->dReader.pReader; +// pIter->iLast = iLast; + +// pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); +// if (pIter->aBlockL == NULL) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// goto _err; +// } +// code = tBlockDataCreate(&pIter->bData); +// if (code) goto _err; + +// code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL); +// if (code) goto _err; + +// if (taosArrayGetSize(pIter->aBlockL) == 0) continue; +// pIter->iBlockL = 0; + +// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); +// code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData); +// if (code) goto _err; + +// pIter->iRow = 0; +// pIter->rowInfo.suid = pIter->bData.suid; +// pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; +// pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); + +// pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); +// ASSERT(pNode); + +// pMerger->dReader.aLDataiter[iLast] = pIter; +// } + +// // writer +// SHeadFile fHead = {.commitID = pMerger->commitID}; +// SDataFile fData = *pSet->pDataF; +// SSmaFile fSma = *pSet->pSmaF; +// SLastFile fLast = {.commitID = pMerger->commitID}; +// SDFileSet wSet = {.diskId = pSet->diskId, +// .fid = pSet->fid, +// .nLastF = 1, +// .pHeadF = &fHead, +// .pDataF = &fData, +// .pSmaF = &fSma, +// .aLastF[0] = &fLast}; +// code = tsdbDataFWriterOpen(&pMerger->dWriter.pWriter, pTsdb, &wSet); +// if (code) goto _err; + +// return code; + +// _err: +// tsdbError("vgId:%d tsdb merge file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// static int32_t tsdbMergeFileDataEnd(STsdbMerger *pMerger) { +// int32_t code = 0; +// STsdb *pTsdb = pMerger->pTsdb; + +// // write aBlockIdx +// code = tsdbWriteBlockIdx(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockIdx); +// if (code) goto _err; + +// // write aBlockL +// code = tsdbWriteBlockL(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockL); +// if (code) goto _err; + +// // update file header +// code = tsdbUpdateDFileSetHeader(pMerger->dWriter.pWriter); +// if (code) goto _err; + +// // upsert SDFileSet +// code = tsdbFSUpsertFSet(&pMerger->fs, &pMerger->dWriter.pWriter->wSet); +// if (code) goto _err; + +// // close and sync +// code = tsdbDataFWriterClose(&pMerger->dWriter.pWriter, 1); +// if (code) goto _err; + +// if (pMerger->dReader.pReader) { +// code = tsdbDataFReaderClose(&pMerger->dReader.pReader); +// if (code) goto _err; +// } + +// return code; + +// _err: +// tsdbError("vgId:%d tsdb merge file data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { +// int32_t code = 0; +// STsdb *pTsdb = pMerger->pTsdb; + +// // start +// code = tsdbMergeFileDataStart(pMerger, pSet); +// if (code) goto _err; + +// // impl +// SRowInfo *pInfo; +// TABLEID id = {0}; +// while (true) { +// code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); +// if (code) goto _err; + +// if (pInfo == NULL) { +// if (pMerger->dWriter.bData.nRow > 0) { +// // TODO +// } + +// if (pMerger->dWriter.bDatal.nRow > 0) { +// // TODO +// } + +// break; +// } + +// if (id.suid != pInfo->suid || id.uid != pInfo->uid) { +// while (true) { +// // move commit the head data +// } + +// // prepare to commit next +// } + +// code = tBlockDataAppendRow(&pMerger->dWriter.bData, &pInfo->row, NULL, pInfo->uid); +// if (code) goto _err; + +// if (pMerger->dWriter.bData.nRow >= pMerger->maxRow * 4 / 5) { +// // code = tsdbCommitDataBlock(); +// if (code) goto _err; +// } +// } + +// // end +// code = tsdbMergeFileDataEnd(pMerger); +// if (code) goto _err; + +// return code; + +// _err: +// tsdbError("vgId:%d tsdb merge file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) { +// int32_t code = 0; + +// pMerger->pTsdb = pTsdb; +// pMerger->maxLast = TSDB_DEFAULT_LAST_FILE; +// pMerger->commitID = ++pTsdb->pVnode->state.commitID; +// code = tsdbFSCopy(pTsdb, &pMerger->fs); +// if (code) goto _exit; + +// // reader +// pMerger->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); +// if (pMerger->dReader.aBlockIdx == NULL) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// goto _exit; +// } +// // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { +// // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); +// // if (pMerger->dReader.aBlockL[iLast] == NULL) { +// // code = TSDB_CODE_OUT_OF_MEMORY; +// // goto _exit; +// // } +// // } + +// // writer +// pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); +// if (pMerger->dWriter.aBlockIdx == NULL) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// goto _exit; +// } +// pMerger->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); +// if (pMerger->dWriter.aBlockL == NULL) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// goto _exit; +// } + +// _exit: +// return code; +// } + +// static int32_t tsdbEndMerge(STsdbMerger *pMerger) { +// int32_t code = 0; +// STsdb *pTsdb = pMerger->pTsdb; + +// code = tsdbFSCommit1(pTsdb, &pMerger->fs); +// if (code) goto _err; + +// taosThreadRwlockWrlock(&pTsdb->rwLock); +// code = tsdbFSCommit2(pTsdb, &pMerger->fs); +// if (code) { +// taosThreadRwlockUnlock(&pTsdb->rwLock); +// goto _err; +// } +// taosThreadRwlockUnlock(&pTsdb->rwLock); + +// // writer +// taosArrayDestroy(pMerger->dWriter.aBlockL); +// taosArrayDestroy(pMerger->dWriter.aBlockIdx); + +// // reader +// // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { +// // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); +// // } +// taosArrayDestroy(pMerger->dReader.aBlockIdx); +// tsdbFSDestroy(&pMerger->fs); + +// return code; + +// _err: +// tsdbError("vgId:%d, tsdb end merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// int32_t tsdbMerge(STsdb *pTsdb) { +// int32_t code = 0; +// STsdbMerger merger = {0}; + +// code = tsdbStartMerge(&merger, pTsdb); +// if (code) goto _err; + +// for (int32_t iSet = 0; iSet < taosArrayGetSize(merger.fs.aDFileSet); iSet++) { +// SDFileSet *pSet = (SDFileSet *)taosArrayGet(merger.fs.aDFileSet, iSet); +// if (pSet->nLastF < merger.maxLast) continue; + +// code = tsdbMergeFileData(&merger, pSet); +// if (code) goto _err; +// } + +// code = tsdbEndMerge(&merger); +// if (code) goto _err; + +// return code; + +// _err: +// tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +// return code; +// } \ No newline at end of file