From 5abaf4409d9ab517bfd872f151b550da57ad2bb8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Dec 2022 17:12:08 +0800 Subject: [PATCH] mroe code --- source/dnode/vnode/src/tsdb/tsdbCompact.c | 76 ++++++++++++++++++++--- 1 file changed, 68 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 8de45989f7..9f3d84d80e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -50,6 +50,8 @@ typedef struct STsdbDataIter { char handle[]; } STsdbDataIter; +#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n))) + typedef struct { STsdb *pTsdb; STsdbFS fs; @@ -59,6 +61,7 @@ typedef struct { SDataFReader *pReader; STsdbDataIter *iterList; // list of iterators SRBTree rtree; + STsdbDataIter *pIter; SBlockData bData; } STsdbCompactor; @@ -215,18 +218,22 @@ static int32_t tsdbDataIterNext(STsdbDataIter *pIter) { pSttDIter->iRow++; if (pSttDIter->iRow < pSttDIter->bData.nRow) { - ASSERT(0); + pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow); } else { pSttDIter->iSttBlk++; if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) { - code = tsdbReadSttBlock(pSttDIter->pReader, pSttDIter->iStt, - taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); + code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, + taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); TSDB_CHECK_CODE(code, lino, _exit); pSttDIter->iRow = 0; + pIter->rowInfo.suid = pSttDIter->bData.suid; + pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow); } else { - // code = TSDB_CODE_TDB_NO_DATA; - // goto _exit; + pIter->rowInfo.suid = 0; + pIter->rowInfo.uid = 0; } } } else { @@ -315,10 +322,59 @@ _exit: return code; } -static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) { +static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; - // TODO + + if (pCompactor->pIter) { + code = tsdbDataIterNext(pCompactor->pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { + pCompactor->pIter = NULL; + } else { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); + if (pNode) { + STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); + + int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo); + ASSERT(c); + + if (c > 0) { + tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n); + pCompactor->pIter = NULL; + } + } + } + } + + if (pCompactor->pIter == NULL) { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); + if (pNode) { + pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); + tRBTreeDrop(&pCompactor->rtree, pNode); + } + } + +_exit: + return code; +} + +static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) { + int32_t code = 0; + int32_t lino = 0; + + if (pCompactor->pIter == NULL) { + code = tsdbCompactNextRow(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (pCompactor->pIter) { + *ppRow = &pCompactor->pIter->rowInfo.row; + } else { + *ppRow = NULL; + } + _exit: return code; } @@ -364,6 +420,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { tRBTreePut(&pCompactor->rtree, &pIter->n); } } + pCompactor->pIter = NULL; _exit: if (code) { @@ -412,13 +469,16 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { TSDBROW *pRow = NULL; int64_t nRow = 0; for (;;) { - code = tsdbCompactNextRow(&compactor, &pRow); + code = tsdbCompactGetRow(&compactor, &pRow); TSDB_CHECK_CODE(code, lino, _exit); if (pRow == NULL) break; nRow++; + code = tsdbCompactNextRow(&compactor); + TSDB_CHECK_CODE(code, lino, _exit); + // code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0); // TSDB_CHECK_CODE(code, lino, _exit);