diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 867a20c36f..7acacba085 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -273,20 +273,6 @@ _exit: // COMPACT ========================= -static void tsdbEndCompact(STsdbCompactor *pCompactor) { - taosArrayDestroy(pCompactor->aSkyLine); - taosArrayDestroy(pCompactor->aDelData); - taosArrayDestroy(pCompactor->aDelIdx); - tsdbDelFReaderClose(&pCompactor->pDelFReader); - tsdbFSDestroy(&pCompactor->fs); - tBlockDataDestroy(&pCompactor->bData); - tDestroyTSchema(pCompactor->tbSkm.pTSchema); - pCompactor->tbSkm.pTSchema = NULL; - taosArrayDestroy(pCompactor->aBlockIdx); - tMapDataClear(&pCompactor->mDataBlk); - taosArrayDestroy(pCompactor->aSttBlk); -} - static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; @@ -423,43 +409,6 @@ static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) { return 0; } -static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) { - TSDBKEY tKey = TSDBROW_KEY(pRow); - - while (tKey.ts > pCompactor->sKey.ts) { - pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version; - pCompactor->iKey++; - if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) { - pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts; - } else { - pCompactor->sKey.ts = TSKEY_MAX; - } - } - - if (tKey.ts < pCompactor->sKey.ts) { - if (tKey.version > pCompactor->sKey.version) { - return false; - } else { - return true; - } - } else if (tKey.ts == pCompactor->sKey.ts) { - int64_t version; - if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) { - version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version); - } else { - version = pCompactor->sKey.version; - } - - if (tKey.version > version) { - return false; - } else { - return true; - } - } - - return false; -} - static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; @@ -774,9 +723,9 @@ typedef struct { SArray *aDelIdx; // SArray SArray *aDelData; // SArray SArray *aSkyLine; // SArray - TSDBKEY *aTSDBKEY; - int32_t iKey; - TSDBKEY sKey; + int32_t iDelIdx; + int32_t iSkyLine; + int8_t onGoing; // Reader SDataFReader *pReader; @@ -799,9 +748,38 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI pCompactor->tbid = *pId; - // TODO + // tombstone + for (;;) { + if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) { + if (pCompactor->aSkyLine) taosArrayClear(pCompactor->aSkyLine); + pCompactor->iSkyLine = 0; + break; + } - // update table schema if necessary (TODO) + SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(pCompactor->aDelIdx, pCompactor->iDelIdx); + int32_t c = tTABLEIDCmprFn(pDelIdx, &pCompactor->tbid); + if (c < 0) { + pCompactor->iDelIdx++; + } else if (c == 0) { + pCompactor->iDelIdx++; + + code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1, + pCompactor->aSkyLine); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->iSkyLine = 0; + break; + } else { + if (pCompactor->aSkyLine) taosArrayClear(pCompactor->aSkyLine); + pCompactor->iSkyLine = 0; + break; + } + } + + // reader and write (TODO) code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm); TSDB_CHECK_CODE(code, lino, _exit); @@ -880,6 +858,43 @@ _exit: return code; } +static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) { + // TSDBKEY tKey = TSDBROW_KEY(pRow); + + // while (tKey.ts > pCompactor->sKey.ts) { + // pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version; + // pCompactor->iKey++; + // if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) { + // pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts; + // } else { + // pCompactor->sKey.ts = TSKEY_MAX; + // } + // } + + // if (tKey.ts < pCompactor->sKey.ts) { + // if (tKey.version > pCompactor->sKey.version) { + // return false; + // } else { + // return true; + // } + // } else if (tKey.ts == pCompactor->sKey.ts) { + // int64_t version; + // if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) { + // version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version); + // } else { + // version = pCompactor->sKey.version; + // } + + // if (tKey.version > version) { + // return false; + // } else { + // return true; + // } + // } + + return false; +} + static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) { int32_t code = 0; int32_t lino = 0; @@ -892,6 +907,7 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p pRowInfo = &rInfo; } + // start a new table data write if need if (pRowInfo->uid != pCompactor->tbid.uid) { if (pCompactor->tbid.uid) { code = tsdbCompactWriteTableDataEnd(pCompactor); @@ -902,7 +918,10 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); + // check if row is deleted + if (pCompactor->onGoing && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit; + + code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); if (pCompactor->bData.nRow >= pCompactor->maxRows) { @@ -977,6 +996,10 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS pCompactor->fid = pSet->fid; pCompactor->tbid = (TABLEID){0}; + /* tombstone */ + pCompactor->iDelIdx = 0; + pCompactor->iSkyLine = 0; + /* reader */ code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet); TSDB_CHECK_CODE(code, lino, _exit); @@ -1114,6 +1137,31 @@ _exit: return code; } +static void tsdbEndCompact(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + // writer + tBlockDataDestroy(&pCompactor->sData); + tBlockDataDestroy(&pCompactor->bData); + taosArrayDestroy(pCompactor->aSttBlk); + tMapDataClear(&pCompactor->mDataBlk); + taosArrayDestroy(pCompactor->aBlockIdx); + + // reader + + // tombstone + taosArrayDestroy(pCompactor->aSkyLine); + taosArrayDestroy(pCompactor->aDelData); + taosArrayDestroy(pCompactor->aDelIdx); + + // others + tDestroyTSchema(pCompactor->tbSkm.pTSchema); + tsdbFSDestroy(&pCompactor->fs); + + tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID); +} + static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; @@ -1128,26 +1176,22 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { code = tsdbFSCopy(pTsdb, &pCompactor->fs); TSDB_CHECK_CODE(code, lino, _exit); - /* tombstone (TODO ) */ -#if 0 + /* tombstone */ if (pCompactor->fs.pDelFile) { code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb); TSDB_CHECK_CODE(code, lino, _exit); - pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); - if (pCompactor->aDelIdx == NULL) { + if ((pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData)); - if (pCompactor->aDelData == NULL) { + if ((pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY)); - if (pCompactor->aSkyLine == NULL) { + if ((pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } @@ -1155,7 +1199,6 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx); TSDB_CHECK_CODE(code, lino, _exit); } -#endif /* reader */ @@ -1213,6 +1256,6 @@ _exit: // } else { // tsdbCommitCompact(pCompactor); // } - // tsdbEndCompact(pCompactor); + tsdbEndCompact(pCompactor); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c index c8392166ad..4187fd7f60 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c @@ -15,6 +15,8 @@ #include "tsdb.h" +// STsdbDataIter2 + /* open */ int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) { int32_t code = 0; @@ -392,3 +394,23 @@ int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { } /* get */ + +// STsdbFSetIter +typedef struct STsdbFSetDataIter { + STsdb* pTsdb; + int32_t flags; + + /* tombstone */ + SDelFReader* pDelFReader; + SArray* aDelIdx; // SArray + SArray* aDelData; // SArray + SArray* aSkeyLine; // SArray + int32_t iDelIdx; + int32_t iSkyLine; + + /* time-series data */ + SDataFReader* pReader; + STsdbDataIter2* iterList; + STsdbDataIter2* pIter; + SRBTree rbt; +} STsdbFSetDataIter;