From 1427f9218ca0e85540b8b1c6f36ce9dc3dfba27e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 27 Dec 2022 16:59:52 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCompact.c | 162 ++++++++++++++++------ 1 file changed, 122 insertions(+), 40 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 55592a59c4..649a123100 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -65,12 +65,13 @@ typedef struct { SRBTree rtree; STsdbDataIter *pIter; SBlockData bData; + SSkmInfo tbSkm; } STsdbCompactor; #define TSDB_FLG_DEEP_COMPACT 0x1 // ITER ========================= -static int32_t tsdbDataIterNext(STsdbDataIter *pIter); +static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId); static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n)); @@ -132,7 +133,7 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) pDataDIter->iDataBlk = 0; pDataDIter->iRow = 0; - code = tsdbDataIterNext(pIter); + code = tsdbDataIterNext(pIter, NULL); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -182,7 +183,7 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt pSttDIter->iSttBlk = -1; pSttDIter->iRow = -1; - code = tsdbDataIterNext(pIter); + code = tsdbDataIterNext(pIter, NULL); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -205,7 +206,7 @@ static void tsdbDataIterClose(STsdbDataIter *pIter) { ASSERT(0); } -static int32_t tsdbDataIterNext(STsdbDataIter *pIter) { +static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) { int32_t code = 0; int32_t lino = 0; @@ -218,25 +219,49 @@ static int32_t tsdbDataIterNext(STsdbDataIter *pIter) { } else if (pIter->flag & TSDB_ITER_TYPE_STT) { SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; - pSttDIter->iRow++; - if (pSttDIter->iRow < pSttDIter->bData.nRow) { + for (;;) { + if (++pSttDIter->iRow >= pSttDIter->bData.nRow) { + for (;;) { + if (++pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) { + SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk); + + // check exclusion + if (pExcludeTableId) { + if (pExcludeTableId->uid) { // exclude (suid, uid) + if (pSttBlk->minUid == pExcludeTableId->uid && pSttBlk->maxUid == pExcludeTableId->uid) continue; + } else { // exclude (suid, *) + if (pSttBlk->suid == pExcludeTableId->suid) continue; + } + } + + code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, pSttBlk, &pSttDIter->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->rowInfo.suid = pSttBlk->suid; + pSttDIter->iRow = 0; + break; + } else { + // iter end, all set 0 and exit + pIter->rowInfo.suid = 0; + pIter->rowInfo.uid = 0; + goto _exit; + } + } + } + pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow]; pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow); - } else { - pSttDIter->iSttBlk++; - if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) { - code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, - taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - pSttDIter->iRow = 0; - pIter->rowInfo.suid = pSttDIter->bData.suid; - pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow); - } else { - pIter->rowInfo.suid = 0; - pIter->rowInfo.uid = 0; + // check exclusion + if (pExcludeTableId) { + if (pExcludeTableId->uid) { // exclude (suid, uid) + if (pIter->rowInfo.uid == pExcludeTableId->uid) continue; + } else { // exclude (suid, *) + if (pIter->rowInfo.suid == pExcludeTableId->suid) continue; + } } + + break; } } else { ASSERT(0); @@ -330,37 +355,94 @@ _exit: return code; } +static int32_t tsdbCompactNextRowImpl(STsdbCompactor *pCompactor, TABLEID *pExcludeTableId) { + int32_t code = 0; + int32_t lino = 0; + + for (;;) { + if (pCompactor->pIter) { + code = tsdbDataIterNext(pCompactor->pIter, pExcludeTableId); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { + pCompactor->pIter = NULL; + } else { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); + if (pNode) { + STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); + + int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo); + ASSERT(c); + + if (c > 0) { + tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n); + pCompactor->pIter = NULL; + } + } + } + } + + if (pCompactor->pIter == NULL) { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); + if (pNode) { + pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); + tRBTreeDrop(&pCompactor->rtree, pNode); + + if (pExcludeTableId) { + if (pExcludeTableId->uid) { + if (pCompactor->pIter->rowInfo.uid == pExcludeTableId->uid) continue; + } else { + if (pCompactor->pIter->rowInfo.suid == pExcludeTableId->suid) continue; + } + } + } + } + + break; + } + +_exit: + return code; +} + static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; - if (pCompactor->pIter) { - code = tsdbDataIterNext(pCompactor->pIter); + for (;;) { + code = tsdbCompactNextRowImpl(pCompactor, NULL); TSDB_CHECK_CODE(code, lino, _exit); - if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { - pCompactor->pIter = NULL; - } else { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); - if (pNode) { - STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); + // check if the table of the row exists + if (pCompactor->pIter) { + if (pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid && + pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid) { + break; + } else { + SMetaInfo info; + if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL) != + TSDB_CODE_SUCCESS) { + // table not exist + } else { + // update table schema + STSchema *pTSchema = + metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, info.version, 1); + if (pTSchema == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo); - ASSERT(c); + pCompactor->tbSkm.suid = pCompactor->pIter->rowInfo.suid; + pCompactor->tbSkm.uid = pCompactor->pIter->rowInfo.uid; + tDestroyTSchema(pCompactor->tbSkm.pTSchema); + pCompactor->tbSkm.pTSchema = pTSchema; - if (c > 0) { - tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n); - pCompactor->pIter = NULL; + break; } } - } - } - - if (pCompactor->pIter == NULL) { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); - if (pNode) { - pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); - tRBTreeDrop(&pCompactor->rtree, pNode); + } else { + // iter end, just break out + break; } }