From 47d493e85148ea8e03e836b0cf1c1becd3914a73 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 30 Dec 2022 14:49:55 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbCompact.c | 187 +++++++++++----------- 2 files changed, 95 insertions(+), 93 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 1d6f37dabc..de764045b2 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -638,6 +638,7 @@ struct SDataFReader { uint8_t *aBuf[3]; }; +// NOTE: do NOT change the order of the fields typedef struct { int64_t suid; int64_t uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 9843f8b81c..9ee8714165 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -688,13 +688,6 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { STsdb *pTsdb = pCompactor->pTsdb; - // next compact file - pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, - tDFileSetCmprFn, TD_GT); - if (pCompactor->pDFileSet == NULL) goto _exit; - - pCompactor->fid = pCompactor->pDFileSet->fid; - // reader code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); TSDB_CHECK_CODE(code, lino, _exit); @@ -841,111 +834,119 @@ _exit: return code; } +static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + // open compactor + code = tsdbOpenCompactor(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + // do compact + SRowInfo *pRowInfo; + STSchema *pTSchema; + int64_t nRow = 0; + + code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) { + TSDB_CHECK_CODE(code, lino, _exit); + } + + while (pRowInfo) { + nRow++; + + if ((pCompactor->tableId.suid != pRowInfo->suid) || // different super table + (pCompactor->tableId.uid != pRowInfo->uid && + (pRowInfo->suid == 0 || + pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)) // different table + ) { + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactWriteDataBlk(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // append row to block data + code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->tableId.suid = pRowInfo->suid; + pCompactor->tableId.uid = pRowInfo->uid; + + // check if block data is full + if (pCompactor->bData.nRow >= pCompactor->maxRows) { + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // iterate to next row + code = tsdbCompactNextRow(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // handle remain data + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactWriteDataBlk(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbUpdateDFileSetHeader(pCompactor->pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFWriterClose(&pCompactor->pWriter, 1); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + // close compactor + tsdbCloseCompactor(pCompactor); + return code; +} + int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { int32_t code = 0; int32_t lino = 0; - // Check if can do compact (TODO) - - // Do compact STsdbCompactor *pCompactor = &(STsdbCompactor){0}; + // begin compact code = tsdbBeginCompact(pTsdb, pCompactor); TSDB_CHECK_CODE(code, lino, _exit); + // do compact each file set while (true) { - code = tsdbOpenCompactor(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - + pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, + tDFileSetCmprFn, TD_GT); if (pCompactor->pDFileSet == NULL) break; - // loop to merge row by row - SRowInfo *pRowInfo; - STSchema *pTSchema; - int64_t nRow = 0; - - code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); + pCompactor->fid = pCompactor->pDFileSet->fid; + code = tsdbCompactFileSet(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); - - if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, - pTSchema, NULL, 0))) { - TSDB_CHECK_CODE(code, lino, _exit); - } - - while (pRowInfo) { - nRow++; - - if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) { // init the block data if not initialized yet - code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, - NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if ((pCompactor->tableId.suid != pRowInfo->suid) || // different super table - (pCompactor->tableId.uid != pRowInfo->uid && - (pRowInfo->suid == 0 || - pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)) // different table - ) { - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactWriteDataBlk(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, - NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // append row to block data - code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->tableId.suid = pRowInfo->suid; - pCompactor->tableId.uid = pRowInfo->uid; - - // check if block data is full - if (pCompactor->bData.nRow >= pCompactor->maxRows) { - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // iterate to next row - code = tsdbCompactNextRow(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactWriteDataBlk(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateDFileSetHeader(pCompactor->pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFWriterClose(&pCompactor->pWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - tsdbCloseCompactor(pCompactor); } code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL); TSDB_CHECK_CODE(code, lino, _exit); _exit: + // commit/abort compact if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbAbortCompact(pCompactor);