diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index b34facb3ed..8469996ccd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -62,6 +62,13 @@ typedef struct { int32_t fid; SDFileSet *pDFileSet; + // Tombstone + SDelFReader *pDelFReader; + SArray *aDelIdx; // SArray + SArray *aDelData; // SArray + SArray *aSkyLine; // SArray + TSDBKEY *aTSDBKEY; + // Reader SDataFReader *pReader; STsdbDataIter *iterList; // list of iterators @@ -314,6 +321,33 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { code = tBlockDataCreate(&pCompactor->bData); TSDB_CHECK_CODE(code, lino, _exit); + // tombstone + if (pCompactor->fs.pDelFile) { + code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); + if (pCompactor->aDelIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData)); + if (pCompactor->aDelData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY)); + if (pCompactor->aSkyLine == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -322,6 +356,10 @@ _exit: } static void tsdbEndCompact(STsdbCompactor *pCompactor) { + taosArrayDestroy(pCompactor->aSkyLine); + taosArrayDestroy(pCompactor->aDelData); + taosArrayDestroy(pCompactor->aDelIdx); + tsdbDelFReaderClose(&pCompactor->pDelFReader); tsdbFSDestroy(&pCompactor->fs); tBlockDataDestroy(&pCompactor->bData); tDestroyTSchema(pCompactor->tbSkm.pTSchema); @@ -451,6 +489,21 @@ _exit: return code; } +static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) { + if (pDelIdx1->suid < pDelIdx2->suid) { + return -1; + } else if (pDelIdx1->suid > pDelIdx2->suid) { + return 1; + } + + if (pDelIdx1->uid < pDelIdx2->uid) { + return -1; + } else if (pDelIdx1->uid > pDelIdx2->uid) { + return 1; + } + + return 0; +} static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; @@ -466,8 +519,15 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { if (pCompactor->pIter) { SRowInfo *pRowInfo = &pCompactor->pIter->rowInfo; - // Table exists, just break out - if (pRowInfo->uid == pCompactor->tbSkm.uid) break; + // Table exists + if (pRowInfo->uid == pCompactor->tbSkm.uid) { + if (pCompactor->aTSDBKEY) { + // TODO: check if the row is deleted. if deleted, continue, else break + ASSERT(0); + } else { + break; + } + } SMetaInfo info; if (pRowInfo->suid) { // child table @@ -523,6 +583,25 @@ static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { } } + // load delData and build the skyline + if (pCompactor->pDelFReader) { + SDelIdx *pDelIdx = + taosArraySearch(pCompactor->aDelIdx, &(SDelIdx){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, + (__compar_fn_t)tDelIdxCmprFn, TD_EQ); + if (pDelIdx) { + code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1, + pCompactor->aSkyLine); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->aTSDBKEY = (TSDBKEY *)TARRAY_DATA(pCompactor->aDelData); + } else { + pCompactor->aTSDBKEY = NULL; + } + } + break; } else { // iter end, just break out