From a885d6e3d2b6c91ed58be2554c1758b05500c9ed Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Feb 2023 14:08:20 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCompact.c | 589 +++++++++++++++------- 1 file changed, 411 insertions(+), 178 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 6948409ca0..867a20c36f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" + #if 0 #define TSDB_ITER_TYPE_MEM 0x0 @@ -53,41 +54,6 @@ typedef struct STsdbDataIter { #define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n))) -typedef struct { - STsdb *pTsdb; - int64_t cid; - int8_t cmprAlg; - int32_t maxRows; - int32_t minRows; - STsdbFS fs; - int32_t fid; - SDFileSet *pDFileSet; - - // Tombstone - SDelFReader *pDelFReader; - SArray *aDelIdx; // SArray - SArray *aDelData; // SArray - SArray *aSkyLine; // SArray - TSDBKEY *aTSDBKEY; - int32_t iKey; - TSDBKEY sKey; - - // Reader - SDataFReader *pReader; - STsdbDataIter *iterList; // list of iterators - SRBTree rtree; - STsdbDataIter *pIter; - SBlockData bData; - SSkmInfo tbSkm; - - // Writer - SDataFWriter *pWriter; - SArray *aBlockIdx; // SArray - SMapData mDataBlk; // SMapData - SArray *aSttBlk; // SArray - TABLEID tableId; -} STsdbCompactor; - #define TSDB_FLG_DEEP_COMPACT 0x1 // ITER ========================= @@ -306,57 +272,6 @@ _exit: } // COMPACT ========================= -static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - pCompactor->pTsdb = pTsdb; - pCompactor->cid = 0; // TODO - pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; - pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; - pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; - - code = tsdbFSCopy(pTsdb, &pCompactor->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->fid = INT32_MIN; - - code = tBlockDataCreate(&pCompactor->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - // tombstone - if (pCompactor->fs.pDelFile) { - code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); - if (pCompactor->aDelIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData)); - if (pCompactor->aDelData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY)); - if (pCompactor->aSkyLine == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} static void tsdbEndCompact(STsdbCompactor *pCompactor) { taosArrayDestroy(pCompactor->aSkyLine); @@ -835,101 +750,311 @@ _exit: return code; } -static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { +#endif + +extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo); +extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg); +extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg); + +typedef struct { + STsdb *pTsdb; + int64_t commitID; + int8_t cmprAlg; + int32_t maxRows; + int32_t minRows; + + STsdbFS fs; + + int32_t fid; + TABLEID tbid; + SSkmInfo tbSkm; + + // Tombstone + SDelFReader *pDelFReader; + SArray *aDelIdx; // SArray + SArray *aDelData; // SArray + SArray *aSkyLine; // SArray + TSDBKEY *aTSDBKEY; + int32_t iKey; + TSDBKEY sKey; + + // Reader + SDataFReader *pReader; + STsdbDataIter2 *iterList; // list of iterators + STsdbDataIter2 *pIter; + SRBTree rbt; + + // Writer + SDataFWriter *pWriter; + SArray *aBlockIdx; // SArray + SMapData mDataBlk; // SMapData + SArray *aSttBlk; // SArray + SBlockData bData; + SBlockData sData; +} STsdbCompactor; + +static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) { int32_t code = 0; int32_t lino = 0; - // open compactor - code = tsdbOpenCompactor(pCompactor); + pCompactor->tbid = *pId; + + // TODO + + // update table schema if necessary (TODO) + code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm); TSDB_CHECK_CODE(code, lino, _exit); - // do compact - SRowInfo *pRowInfo; - STSchema *pTSchema; - int64_t nRow = 0; + tMapDataReset(&pCompactor->mDataBlk); - code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); + code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); - if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) { - TSDB_CHECK_CODE(code, lino, _exit); - pCompactor->tableId.suid = pRowInfo->suid; - pCompactor->tableId.uid = pRowInfo->uid; - } - - while (pRowInfo) { - // if suid changed - if (pCompactor->tableId.suid != pRowInfo->suid) { - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactWriteDataBlk(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->tableId.suid = pRowInfo->suid; - pCompactor->tableId.uid = pRowInfo->uid; - } - - // if uid changed - if (pCompactor->tableId.uid != pRowInfo->uid) { - // if need to write the block data - bool init = false; - if (pCompactor->bData.suid == 0) { - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - init = true; - } else if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) { - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // write SDataBlk - code = tsdbCompactWriteDataBlk(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - // init block data if need - if (init && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) { - TSDB_CHECK_CODE(code, lino, _exit); - } - - pCompactor->tableId.suid = pRowInfo->suid; - pCompactor->tableId.uid = pRowInfo->uid; - } - - // if append/merge the row causes nRow exceed maxRows - if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) { - code = tsdbCompactWriteBlockData(pCompactor); + if (!TABLE_SAME_SCHEMA(pCompactor->tbid.suid, pCompactor->tbid.uid, pId->suid, pId->uid)) { + if (pCompactor->sData.nRow > 0) { + code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } - // append/merge the row - code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); - TSDB_CHECK_CODE(code, lino, _exit); - - // iter to the next row - code = tsdbCompactNextRow(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); + code = tBlockDataInit(&pCompactor->sData, pId /* TODO */, pCompactor->tbSkm.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } - // handle remain data - code = tsdbCompactWriteBlockData(pCompactor); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pId->suid, + pId->uid); + } + return code; +} + +static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + if (pCompactor->bData.nRow > 0) { + if (pCompactor->bData.nRow < pCompactor->minRows) { + for (int32_t iRow = 0; iRow < pCompactor->bData.nRow; iRow++) { + code = tBlockDataAppendRow(&pCompactor->sData, &tsdbRowFromBlockData(&pCompactor->bData, iRow), NULL, + pCompactor->tbid.uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->sData.nRow >= pCompactor->maxRows) { + code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + tBlockDataClear(&pCompactor->bData); + } else { + code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + if (pCompactor->mDataBlk.nItem) { + SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1); + if (pBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pBlockIdx->suid = pCompactor->tbid.suid; + pBlockIdx->uid = pCompactor->tbid.uid; + + code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, + pCompactor->tbid.suid, pCompactor->tbid.uid); + } + return code; +} + +static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) { + int32_t code = 0; + int32_t lino = 0; + + SRowInfo rInfo; + if (pRowInfo == NULL) { + rInfo.suid = INT64_MAX; + rInfo.uid = INT64_MAX; + // rInfo.row = TSDBORW_V; + pRowInfo = &rInfo; + } + + if (pRowInfo->uid != pCompactor->tbid.uid) { + if (pCompactor->tbid.uid) { + code = tsdbCompactWriteTableDataEnd(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbCompactWriteDataBlk(pCompactor); + if (pCompactor->bData.nRow >= pCompactor->maxRows) { + code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } else if (pRowInfo) { + tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, + TD_VID(pCompactor->pTsdb->pVnode), __func__, pRowInfo->suid, pRowInfo->uid, TSDBROW_TS(&pRowInfo->row), + TSDBROW_VERSION(&pRowInfo->row)); + } + return code; +} + +static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) { + int32_t code = 0; + int32_t lino = 0; + + if (pCompactor->pIter) { + code = tsdbDataIterNext2(pCompactor->pIter, NULL /* TODO */); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { + pCompactor->pIter = NULL; + } else { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt); + if (pNode) { + int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode); + if (c > 0) { + tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); + pCompactor->pIter = NULL; + } else if (c == 0) { + ASSERT(0); + } + } + } + } + + if (pCompactor->pIter == NULL) { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt); + if (pNode) { + tRBTreeDrop(&pCompactor->rbt, pNode); + pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); + } + } + + if (ppRowInfo) { + if (pCompactor->pIter) { + *ppRowInfo = &pCompactor->pIter->rowInfo; + } else { + *ppRowInfo = NULL; + } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + return code; +} + +static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pSet) { + int32_t code = 0; + int32_t lino = 0; + + pCompactor->fid = pSet->fid; + pCompactor->tbid = (TABLEID){0}; + + /* reader */ + code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->pIter) { + pCompactor->pIter->next = pCompactor->iterList; + pCompactor->iterList = pCompactor->pIter; + } + + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + code = tsdbOpenSttFileDataIter(pCompactor->pReader, iStt, &pCompactor->pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->pIter) { + pCompactor->pIter->next = pCompactor->iterList; + pCompactor->iterList = pCompactor->pIter; + } + } + + pCompactor->pIter = NULL; + tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn); + + /* writer */ + code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb, + &(SDFileSet){.fid = pCompactor->fid, + .diskId = pSet->diskId, + .pHeadF = &(SHeadFile){.commitID = pCompactor->commitID}, + .pDataF = &(SDataFile){.commitID = pCompactor->commitID}, + .pSmaF = &(SSmaFile){.commitID = pCompactor->commitID}, + .nSttF = 1, + .aSttF = {&(SSttFile){.commitID = pCompactor->commitID}}}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->aBlockIdx) { + taosArrayClear(pCompactor->aBlockIdx); + } else if ((pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + tMapDataReset(&pCompactor->mDataBlk); + + if (pCompactor->aSttBlk) { + taosArrayClear(pCompactor->aSttBlk); + } else if ((pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + tBlockDataReset(&pCompactor->bData); + tBlockDataReset(&pCompactor->sData); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code), pCompactor->fid); + } else { + tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid); + } + return code; +} + +static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + /* finish remaining data (TODO) */ + + /* update files */ + code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbUpdateDFileSetHeader(pCompactor->pWriter); TSDB_CHECK_CODE(code, lino, _exit); @@ -939,47 +1064,155 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { code = tsdbDataFWriterClose(&pCompactor->pWriter, 1); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbDataFReaderClose(&pCompactor->pReader); + TSDB_CHECK_CODE(code, lino, _exit); + + /* do clear */ + while ((pCompactor->pIter = pCompactor->iterList) != NULL) { + pCompactor->iterList = pCompactor->pIter->next; + tsdbCloseDataIter2(pCompactor->pIter); + } + + tBlockDataReset(&pCompactor->bData); + tBlockDataReset(&pCompactor->sData); + _exit: - // close compactor - tsdbCloseCompactor(pCompactor); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code), pCompactor->fid); + } else { + tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid); + } return code; } + +static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) { + int32_t code = 0; + int32_t lino = 0; + + // start compact + code = tsdbCompactFileSetStart(pCompactor, pSet); + TSDB_CHECK_CODE(code, lino, _exit); + + // do compact + SRowInfo *pRowInfo; + for (;;) { + code = tsdbCompactNextRow(pCompactor, &pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactWriteTableData(pCompactor, pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pRowInfo == NULL) break; + } + + // end compact + code = tsdbCompactFileSetEnd(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + return code; +} + +static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + pCompactor->pTsdb = pTsdb; + pCompactor->commitID = 0; // TODO + pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; + pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; + pCompactor->fid = INT32_MIN; + + code = tsdbFSCopy(pTsdb, &pCompactor->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + /* tombstone (TODO ) */ +#if 0 + if (pCompactor->fs.pDelFile) { + code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); + if (pCompactor->aDelIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData)); + if (pCompactor->aDelData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY)); + if (pCompactor->aSkyLine == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); + } #endif + /* reader */ + + /* writer */ + code = tBlockDataCreate(&pCompactor->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tBlockDataCreate(&pCompactor->sData); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino, + tstrerror(code), pCompactor->commitID); + } else { + tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pCompactor->commitID); + } + return code; +} + int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { int32_t code = 0; int32_t lino = 0; -#if 0 STsdbCompactor *pCompactor = &(STsdbCompactor){0}; // begin compact code = tsdbBeginCompact(pTsdb, pCompactor); TSDB_CHECK_CODE(code, lino, _exit); - // do compact each file set + // loop to compact each file set while (true) { - pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, - tDFileSetCmprFn, TD_GT); - if (pCompactor->pDFileSet == NULL) break; + SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, + tDFileSetCmprFn, TD_GT); + if (pSet == NULL) { + pCompactor->fid = INT32_MAX; + break; + } - pCompactor->fid = pCompactor->pDFileSet->fid; - code = tsdbCompactFileSet(pCompactor); + code = tsdbCompactFileSet(pCompactor, pSet); TSDB_CHECK_CODE(code, lino, _exit); } +#if 0 + code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL); TSDB_CHECK_CODE(code, lino, _exit); -_exit: - // commit/abort compact - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - tsdbAbortCompact(pCompactor); - } else { - tsdbCommitCompact(pCompactor); - } - tsdbEndCompact(pCompactor); #endif +_exit: + // // commit/abort compact + // if (code) { + // tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + // tsdbAbortCompact(pCompactor); + // } else { + // tsdbCommitCompact(pCompactor); + // } + // tsdbEndCompact(pCompactor); return code; }