diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 6040674e8c..8de45989f7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -15,6 +15,10 @@ #include "tsdb.h" +#define TSDB_ITER_TYPE_MEM 0x0 +#define TSDB_ITER_TYPE_DAT 0x1 +#define TSDB_ITER_TYPE_STT 0x2 + typedef struct { } SMemDIter; @@ -37,7 +41,9 @@ typedef struct { int32_t iRow; } SSttDIter; -typedef struct { +typedef struct STsdbDataIter { + struct STsdbDataIter *next; + int32_t flag; SRowInfo rowInfo; SRBTreeNode n; @@ -45,19 +51,22 @@ typedef struct { } STsdbDataIter; typedef struct { - STsdb *pTsdb; - STsdbFS fs; - int64_t cid; - int32_t fid; - SDataFReader *pReader; - SDFileSet *pDFileSet; - SRBTree rtree; - SBlockData bData; + STsdb *pTsdb; + STsdbFS fs; + int64_t cid; + int32_t fid; + SDFileSet *pDFileSet; + SDataFReader *pReader; + STsdbDataIter *iterList; // list of iterators + SRBTree rtree; + SBlockData bData; } STsdbCompactor; #define TSDB_FLG_DEEP_COMPACT 0x1 // ITER ========================= +static int32_t tsdbDataIterNext(STsdbDataIter *pIter); + static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n)); const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n)); @@ -95,6 +104,7 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + pIter->flag = TSDB_ITER_TYPE_DAT; SDataDIter *pDataDIter = (SDataDIter *)pIter->handle; pDataDIter->pReader = pReader; @@ -109,21 +119,16 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit; + // TODO code = tBlockDataCreate(&pDataDIter->bData); TSDB_CHECK_CODE(code, lino, _exit); - // read first data block - pDataDIter->iBlockIdx = 0; - code = tsdbReadDataBlk(pReader, taosArrayGet(pDataDIter->aBlockIdx, pDataDIter->iBlockIdx), &pDataDIter->mDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - + pDataDIter->iBlockIdx = -1; pDataDIter->iDataBlk = 0; - // code = tsdbReadDataBlock(pReader, tMapDat); - // TSDB_CHECK_CODE(code, lino, _exit); - pDataDIter->iRow = 0; - // TODO + code = tsdbDataIterNext(pIter); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -150,6 +155,7 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + pIter->flag = TSDB_ITER_TYPE_STT; SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; pSttDIter->pReader = pReader; @@ -168,13 +174,11 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt code = tBlockDataCreate(&pSttDIter->bData); TSDB_CHECK_CODE(code, lino, _exit); - pSttDIter->iSttBlk = 0; - // code = tsdbReadSttBlock(pReader, taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); - // TSDB_CHECK_CODE(code, lino, _exit); + pSttDIter->iSttBlk = -1; + pSttDIter->iRow = -1; - pSttDIter->iRow = 0; - - // TODO + code = tsdbDataIterNext(pIter); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -193,12 +197,42 @@ _exit: static void tsdbDataIterClose(STsdbDataIter *pIter) { // TODO + ASSERT(0); } static int32_t tsdbDataIterNext(STsdbDataIter *pIter) { int32_t code = 0; int32_t lino = 0; - // TODO + + if (pIter->flag & TSDB_ITER_TYPE_MEM) { + // TODO + ASSERT(0); + } else if (pIter->flag & TSDB_ITER_TYPE_DAT) { + // TODO + ASSERT(0); + } else if (pIter->flag & TSDB_ITER_TYPE_STT) { + SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; + + pSttDIter->iRow++; + if (pSttDIter->iRow < pSttDIter->bData.nRow) { + ASSERT(0); + } else { + pSttDIter->iSttBlk++; + if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) { + code = tsdbReadSttBlock(pSttDIter->pReader, pSttDIter->iStt, + taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pSttDIter->iRow = 0; + } else { + // code = TSDB_CODE_TDB_NO_DATA; + // goto _exit; + } + } + } else { + ASSERT(0); + } + _exit: return code; } @@ -289,6 +323,73 @@ _exit: return code; } +static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + STsdb *pTsdb = pCompactor->pTsdb; + + // next compact file + pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, + tDFileSetCmprFn, TD_GT); + if (pCompactor->pDFileSet == NULL) goto _exit; + + pCompactor->fid = pCompactor->pDFileSet->fid; + + code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); + TSDB_CHECK_CODE(code, lino, _exit); + + // open iters + STsdbDataIter *pIter; + + pCompactor->iterList = NULL; + tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn); + + code = tsdbDataDIterOpen(pCompactor->pReader, &pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pIter) { + pIter->next = pCompactor->iterList; + pCompactor->iterList = pIter; + tRBTreePut(&pCompactor->rtree, &pIter->n); + } + + for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) { + code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pIter) { + pIter->next = pCompactor->iterList; + pCompactor->iterList = pIter; + tRBTreePut(&pCompactor->rtree, &pIter->n); + } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + } + return code; +} + +static void tsdbCloseCompactor(STsdbCompactor *pCompactor) { + STsdb *pTsdb = pCompactor->pTsdb; + + for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) { + STsdbDataIter *pIterNext = pIter->next; + tsdbDataIterClose(pIter); + pIter = pIterNext; + } + + // TODO + ASSERT(0); + +_exit: + tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); +} + int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { int32_t code = 0; int32_t lino = 0; @@ -302,39 +403,22 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { TSDB_CHECK_CODE(code, lino, _exit); while (true) { - compactor.pDFileSet = (SDFileSet *)taosArraySearch(compactor.fs.aDFileSet, &compactor.fid, tDFileSetCmprFn, TD_GT); + code = tsdbOpenCompactor(&compactor); + TSDB_CHECK_CODE(code, lino, _exit); + if (compactor.pDFileSet == NULL) break; - compactor.fid = compactor.pDFileSet->fid; - - code = tsdbDataFReaderOpen(&compactor.pReader, pTsdb, compactor.pDFileSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // open those iterators - tRBTreeCreate(&compactor.rtree, tsdbDataIterCmprFn); - - STsdbDataIter *pIter; - - code = tsdbDataDIterOpen(compactor.pReader, &pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pIter) tRBTreePut(&compactor.rtree, &pIter->n); - - for (int32_t iStt = 0; iStt < compactor.pReader->pSet->nSttF; iStt++) { - code = tsdbSttDIterOpen(compactor.pReader, iStt, &pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pIter) tRBTreePut(&compactor.rtree, &pIter->n); - } - // loop to merge row by row TSDBROW *pRow = NULL; + int64_t nRow = 0; for (;;) { code = tsdbCompactNextRow(&compactor, &pRow); TSDB_CHECK_CODE(code, lino, _exit); if (pRow == NULL) break; + nRow++; + // code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0); // TSDB_CHECK_CODE(code, lino, _exit); @@ -343,6 +427,8 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { // TSDB_CHECK_CODE(code, lino, _exit); // } } + + tsdbCloseCompactor(&compactor); } _exit: