From 341ff0262436d829067d3ee998f9c4f76e64936a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 26 Aug 2022 13:33:16 +0800 Subject: [PATCH] more code --- source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/tsdb/tsdbCommit.c | 388 ++++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbMerge.c | 434 ----------------------- 3 files changed, 388 insertions(+), 435 deletions(-) delete mode 100644 source/dnode/vnode/src/tsdb/tsdbMerge.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index a3ddad72fc..3489863f37 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -51,7 +51,6 @@ target_sources( "src/tsdb/tsdbRetention.c" "src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbCompress.c" - "src/tsdb/tsdbMerge.c" "src/tsdb/tsdbCompact.c" # tq diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 9f4c405b33..0012bcd5e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1288,3 +1288,391 @@ _err: tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } + +// Merger ===================================================================== +typedef struct { + int64_t suid; + int64_t uid; + TSDBROW row; +} SRowInfo; + +typedef struct { + SRowInfo rowInfo; + SDataFReader *pReader; + int32_t iLast; + SArray *aBlockL; // SArray + int32_t iBlockL; + SBlockData bData; + int32_t iRow; +} SLDataIter; + +typedef struct { + SRBTreeNode *pNode; + SRBTree rbt; +} SDataMerger; + +static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { + SRowInfo *pInfo1 = (SRowInfo *)p1; + SRowInfo *pInfo2 = (SRowInfo *)p2; + + if (pInfo1->suid < pInfo2->suid) { + return -1; + } else if (pInfo1->suid > pInfo2->suid) { + return 1; + } + + if (pInfo1->uid < pInfo2->uid) { + return -1; + } else if (pInfo1->uid > pInfo2->uid) { + return 1; + } + + return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); +} + +static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { + pMerger->pNode = NULL; + tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); + for (int32_t iNode = 0; iNode < taosArrayGetSize(aNodeP); iNode++) { + SRBTreeNode *pNode = (SRBTreeNode *)taosArrayGetP(aNodeP, iNode); + + pNode = tRBTreePut(&pMerger->rbt, pNode); + ASSERT(pNode); + } +} + +extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, + SBlockData *pBlockData); // todo + +static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { + int32_t code = 0; + + if (pMerger->pNode) { + // next current iter + SLDataIter *pIter = (SLDataIter *)pMerger->pNode->payload; + + pIter->iRow++; + if (pIter->iRow < pIter->bData.nRow) { + pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + } else { + pIter->iBlockL++; + if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { + SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData); + if (code) goto _exit; + + pIter->iRow = 0; + pIter->rowInfo.suid = pIter->bData.suid; + pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); + } else { + pMerger->pNode = NULL; + } + } + + SRBTreeNode *pMinNode = tRBTreeMin(&pMerger->rbt); + if (pMerger->pNode && pMinNode) { + int32_t c = tRowInfoCmprFn(pMerger->pNode->payload, pMinNode->payload); + if (c > 0) { + pMerger->pNode = tRBTreePut(&pMerger->rbt, pMerger->pNode); + ASSERT(pMerger->pNode); + pMerger->pNode = NULL; + } else { + ASSERT(c); + } + } + } + + if (pMerger->pNode == NULL) { + pMerger->pNode = tRBTreeMin(&pMerger->rbt); + if (pMerger->pNode) { + tRBTreeDrop(&pMerger->rbt, pMerger->pNode); + } + } + + if (pMerger->pNode) { + *ppInfo = &((SLDataIter *)pMerger->pNode->payload)[0].rowInfo; + } else { + *ppInfo = NULL; + } + +_exit: + return code; +} + +// ================================================================================ +typedef struct { + STsdb *pTsdb; + int8_t maxLast; + int32_t minRow; + int32_t maxRow; + int8_t cmprAlg; + int64_t commitID; + STsdbFS fs; + struct { + SDataFReader *pReader; + SArray *aBlockIdx; + SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE]; + SDataMerger merger; + } dReader; + struct { + SDataFWriter *pWriter; + SArray *aBlockIdx; + SArray *aBlockL; + SBlockData bData; + SBlockData bDatal; + } dWriter; +} STsdbMerger; + +static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { + int32_t code = 0; + STsdb *pTsdb = pMerger->pTsdb; + + // reader + code = tsdbDataFReaderOpen(&pMerger->dReader.pReader, pTsdb, pSet); + if (code) goto _err; + + code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx); + if (code) goto _err; + + pMerger->dReader.merger.pNode = NULL; + tRBTreeCreate(&pMerger->dReader.merger.rbt, tRowInfoCmprFn); + for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { + SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter)); + if (pNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SLDataIter *pIter = (SLDataIter *)pNode->payload; + pIter->pReader = pMerger->dReader.pReader; + pIter->iLast = iLast; + + pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pIter->aBlockL == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataCreate(&pIter->bData); + if (code) goto _err; + + code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL); + if (code) goto _err; + + if (taosArrayGetSize(pIter->aBlockL) == 0) continue; + pIter->iBlockL = 0; + + SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); + code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData); + if (code) goto _err; + + pIter->iRow = 0; + pIter->rowInfo.suid = pIter->bData.suid; + pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); + + pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); + ASSERT(pNode); + + pMerger->dReader.aLDataiter[iLast] = pIter; + } + + // writer + SHeadFile fHead = {.commitID = pMerger->commitID}; + SDataFile fData = *pSet->pDataF; + SSmaFile fSma = *pSet->pSmaF; + SLastFile fLast = {.commitID = pMerger->commitID}; + SDFileSet wSet = {.diskId = pSet->diskId, + .fid = pSet->fid, + .nLastF = 1, + .pHeadF = &fHead, + .pDataF = &fData, + .pSmaF = &fSma, + .aLastF[0] = &fLast}; + code = tsdbDataFWriterOpen(&pMerger->dWriter.pWriter, pTsdb, &wSet); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb merge file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbMergeFileDataEnd(STsdbMerger *pMerger) { + int32_t code = 0; + STsdb *pTsdb = pMerger->pTsdb; + + // write aBlockIdx + code = tsdbWriteBlockIdx(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockIdx); + if (code) goto _err; + + // write aBlockL + code = tsdbWriteBlockL(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockL); + if (code) goto _err; + + // update file header + code = tsdbUpdateDFileSetHeader(pMerger->dWriter.pWriter); + if (code) goto _err; + + // upsert SDFileSet + code = tsdbFSUpsertFSet(&pMerger->fs, &pMerger->dWriter.pWriter->wSet); + if (code) goto _err; + + // close and sync + code = tsdbDataFWriterClose(&pMerger->dWriter.pWriter, 1); + if (code) goto _err; + + if (pMerger->dReader.pReader) { + code = tsdbDataFReaderClose(&pMerger->dReader.pReader); + if (code) goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d tsdb merge file data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { + int32_t code = 0; + STsdb *pTsdb = pMerger->pTsdb; + + // start + code = tsdbMergeFileDataStart(pMerger, pSet); + if (code) goto _err; + + // impl + SRowInfo *pInfo; + TABLEID id = {0}; + while (true) { + code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); + if (code) goto _err; + + if (pInfo == NULL) { + // end commit (todo) + break; + } + + if (id.suid != pInfo->suid || id.uid != pInfo->uid) { + // table changed, do something (todo) + } + + code = tBlockDataAppendRow(&pMerger->dWriter.bData, &pInfo->row, NULL, pInfo->uid); + if (code) goto _err; + + if (pMerger->dWriter.bData.nRow >= pMerger->maxRow * 4 / 5) { + // code = tsdbCommitDataBlock(); + if (code) goto _err; + } + } + + // end + code = tsdbMergeFileDataEnd(pMerger); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb merge file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) { + int32_t code = 0; + + pMerger->pTsdb = pTsdb; + pMerger->maxLast = TSDB_DEFAULT_LAST_FILE; + pMerger->commitID = ++pTsdb->pVnode->state.commitID; + code = tsdbFSCopy(pTsdb, &pMerger->fs); + if (code) goto _exit; + + // reader + pMerger->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pMerger->dReader.aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { + // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); + // if (pMerger->dReader.aBlockL[iLast] == NULL) { + // code = TSDB_CODE_OUT_OF_MEMORY; + // goto _exit; + // } + // } + + // writer + pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pMerger->dWriter.aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pMerger->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pMerger->dWriter.aBlockL == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + return code; +} + +static int32_t tsdbEndMerge(STsdbMerger *pMerger) { + int32_t code = 0; + STsdb *pTsdb = pMerger->pTsdb; + + code = tsdbFSCommit1(pTsdb, &pMerger->fs); + if (code) goto _err; + + taosThreadRwlockWrlock(&pTsdb->rwLock); + code = tsdbFSCommit2(pTsdb, &pMerger->fs); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + taosThreadRwlockUnlock(&pTsdb->rwLock); + + // writer + taosArrayDestroy(pMerger->dWriter.aBlockL); + taosArrayDestroy(pMerger->dWriter.aBlockIdx); + + // reader + // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { + // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); + // } + taosArrayDestroy(pMerger->dReader.aBlockIdx); + tsdbFSDestroy(&pMerger->fs); + + return code; + +_err: + tsdbError("vgId:%d, tsdb end merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbMerge(STsdb *pTsdb) { + int32_t code = 0; + STsdbMerger merger = {0}; + + code = tsdbStartMerge(&merger, pTsdb); + if (code) goto _err; + + for (int32_t iSet = 0; iSet < taosArrayGetSize(merger.fs.aDFileSet); iSet++) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(merger.fs.aDFileSet, iSet); + if (pSet->nLastF < merger.maxLast) continue; + + code = tsdbMergeFileData(&merger, pSet); + if (code) goto _err; + } + + code = tsdbEndMerge(&merger); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c deleted file mode 100644 index 19d2cd45f3..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ /dev/null @@ -1,434 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -typedef struct { - int64_t suid; - int64_t uid; - TSDBROW row; -} SRowInfo; - -typedef struct { - SRowInfo rowInfo; - SDataFReader *pReader; - int32_t iLast; - SArray *aBlockL; // SArray - int32_t iBlockL; - SBlockData bData; - int32_t iRow; -} SLDataIter; - -typedef struct { - SRBTreeNode *pNode; - SRBTree rbt; -} SDataMerger; - -static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { - SRowInfo *pInfo1 = (SRowInfo *)p1; - SRowInfo *pInfo2 = (SRowInfo *)p2; - - if (pInfo1->suid < pInfo2->suid) { - return -1; - } else if (pInfo1->suid > pInfo2->suid) { - return 1; - } - - if (pInfo1->uid < pInfo2->uid) { - return -1; - } else if (pInfo1->uid > pInfo2->uid) { - return 1; - } - - return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); -} - -static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { - pMerger->pNode = NULL; - tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); - for (int32_t iNode = 0; iNode < taosArrayGetSize(aNodeP); iNode++) { - SRBTreeNode *pNode = (SRBTreeNode *)taosArrayGetP(aNodeP, iNode); - - pNode = tRBTreePut(&pMerger->rbt, pNode); - ASSERT(pNode); - } -} - -extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, - SBlockData *pBlockData); // todo - -static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { - int32_t code = 0; - - if (pMerger->pNode) { - // next current iter - SLDataIter *pIter = (SLDataIter *)pMerger->pNode->payload; - - pIter->iRow++; - if (pIter->iRow < pIter->bData.nRow) { - pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - } else { - pIter->iBlockL++; - if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); - code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData); - if (code) goto _exit; - - pIter->iRow = 0; - pIter->rowInfo.suid = pIter->bData.suid; - pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); - } else { - pMerger->pNode = NULL; - } - } - - SRBTreeNode *pMinNode = tRBTreeMin(&pMerger->rbt); - if (pMerger->pNode && pMinNode) { - int32_t c = tRowInfoCmprFn(pMerger->pNode->payload, pMinNode->payload); - if (c > 0) { - pMerger->pNode = tRBTreePut(&pMerger->rbt, pMerger->pNode); - ASSERT(pMerger->pNode); - pMerger->pNode = NULL; - } else { - ASSERT(c); - } - } - } - - if (pMerger->pNode == NULL) { - pMerger->pNode = tRBTreeMin(&pMerger->rbt); - if (pMerger->pNode) { - tRBTreeDrop(&pMerger->rbt, pMerger->pNode); - } - } - - if (pMerger->pNode) { - *ppInfo = &((SLDataIter *)pMerger->pNode->payload)[0].rowInfo; - } else { - *ppInfo = NULL; - } - -_exit: - return code; -} - -// ================================================================================ -typedef struct { - STsdb *pTsdb; - int8_t maxLast; - int64_t commitID; - STsdbFS fs; - struct { - SDataFReader *pReader; - SArray *aBlockIdx; - SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE]; - SDataMerger merger; - } dReader; - struct { - SDataFWriter *pWriter; - SArray *aBlockIdx; - SArray *aBlockL; - SBlockData bData; - SBlockData bDatal; - } dWriter; -} STsdbMerger; - -static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - // reader - code = tsdbDataFReaderOpen(&pMerger->dReader.pReader, pTsdb, pSet); - if (code) goto _err; - - code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx); - if (code) goto _err; - - pMerger->dReader.merger.pNode = NULL; - tRBTreeCreate(&pMerger->dReader.merger.rbt, tRowInfoCmprFn); - for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { - SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter)); - if (pNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - SLDataIter *pIter = (SLDataIter *)pNode->payload; - pIter->pReader = pMerger->dReader.pReader; - pIter->iLast = iLast; - - pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pIter->aBlockL == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - code = tBlockDataCreate(&pIter->bData); - if (code) goto _err; - - code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL); - if (code) goto _err; - - if (taosArrayGetSize(pIter->aBlockL) == 0) continue; - pIter->iBlockL = 0; - - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); - code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData); - if (code) goto _err; - - pIter->iRow = 0; - pIter->rowInfo.suid = pIter->bData.suid; - pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); - - pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); - ASSERT(pNode); - - pMerger->dReader.aLDataiter[iLast] = pIter; - } - - // writer - SHeadFile fHead = {.commitID = pMerger->commitID}; - SDataFile fData = *pSet->pDataF; - SSmaFile fSma = *pSet->pSmaF; - SLastFile fLast = {.commitID = pMerger->commitID}; - SDFileSet wSet = {.diskId = pSet->diskId, - .fid = pSet->fid, - .nLastF = 1, - .pHeadF = &fHead, - .pDataF = &fData, - .pSmaF = &fSma, - .aLastF[0] = &fLast}; - code = tsdbDataFWriterOpen(&pMerger->dWriter.pWriter, pTsdb, &wSet); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb merge file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbMergeFileDataEnd(STsdbMerger *pMerger) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - // write aBlockIdx - code = tsdbWriteBlockIdx(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockIdx); - if (code) goto _err; - - // write aBlockL - code = tsdbWriteBlockL(pMerger->dWriter.pWriter, pMerger->dWriter.aBlockL); - if (code) goto _err; - - // update file header - code = tsdbUpdateDFileSetHeader(pMerger->dWriter.pWriter); - if (code) goto _err; - - // upsert SDFileSet - code = tsdbFSUpsertFSet(&pMerger->fs, &pMerger->dWriter.pWriter->wSet); - if (code) goto _err; - - // close and sync - code = tsdbDataFWriterClose(&pMerger->dWriter.pWriter, 1); - if (code) goto _err; - - if (pMerger->dReader.pReader) { - code = tsdbDataFReaderClose(&pMerger->dReader.pReader); - if (code) goto _err; - } - - return code; - -_err: - tsdbError("vgId:%d tsdb merge file data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -typedef struct { - int64_t suid; - int64_t uid; - TSKEY ts; - int64_t version; -} SRInfo; - -static int32_t tRInfoCmprFn(const void *p1, const void *p2) { - SRInfo *pInfo1 = (SRInfo *)p1; - SRInfo *pInfo2 = (SRInfo *)p2; - - // suid - if (pInfo1->suid < pInfo2->suid) { - return -1; - } else if (pInfo1->suid > pInfo2->suid) { - return 1; - } - - // uid - if (pInfo1->uid < pInfo2->uid) { - return -1; - } else if (pInfo1->uid > pInfo2->uid) { - return 1; - } - - // ts - if (pInfo1->ts < pInfo2->ts) { - return -1; - } else if (pInfo1->ts > pInfo2->ts) { - return 1; - } - - // version - if (pInfo1->version < pInfo2->version) { - return -1; - } else if (pInfo1->version > pInfo2->version) { - return 1; - } - - return 0; -} - -static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - // start - code = tsdbMergeFileDataStart(pMerger, pSet); - if (code) goto _err; - - // impl - SRowInfo *pInfo; - int64_t nRow = 0; - SRInfo rInfo = {.suid = INT64_MIN}; - while (true) { - code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); - if (code) goto _err; - - if (pInfo == NULL) break; - nRow++; - - SRInfo rInfoT = { - .suid = pInfo->suid, .uid = pInfo->uid, .ts = TSDBROW_TS(&pInfo->row), .version = TSDBROW_VERSION(&pInfo->row)}; - ASSERT(tRInfoCmprFn(&rInfoT, &rInfo) > 0); - rInfo = rInfoT; - } - - // end - code = tsdbMergeFileDataEnd(pMerger); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb merge file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) { - int32_t code = 0; - - pMerger->pTsdb = pTsdb; - pMerger->maxLast = TSDB_DEFAULT_LAST_FILE; - pMerger->commitID = ++pTsdb->pVnode->state.commitID; - code = tsdbFSCopy(pTsdb, &pMerger->fs); - if (code) goto _exit; - - // reader - pMerger->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pMerger->dReader.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); - // if (pMerger->dReader.aBlockL[iLast] == NULL) { - // code = TSDB_CODE_OUT_OF_MEMORY; - // goto _exit; - // } - // } - - // writer - pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pMerger->dWriter.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pMerger->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pMerger->dWriter.aBlockL == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - -_exit: - return code; -} - -static int32_t tsdbEndMerge(STsdbMerger *pMerger) { - int32_t code = 0; - STsdb *pTsdb = pMerger->pTsdb; - - code = tsdbFSCommit1(pTsdb, &pMerger->fs); - if (code) goto _err; - - taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pTsdb, &pMerger->fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; - } - taosThreadRwlockUnlock(&pTsdb->rwLock); - - // writer - taosArrayDestroy(pMerger->dWriter.aBlockL); - taosArrayDestroy(pMerger->dWriter.aBlockIdx); - - // reader - // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); - // } - taosArrayDestroy(pMerger->dReader.aBlockIdx); - tsdbFSDestroy(&pMerger->fs); - - return code; - -_err: - tsdbError("vgId:%d, tsdb end merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbMerge(STsdb *pTsdb) { - int32_t code = 0; - STsdbMerger merger = {0}; - - code = tsdbStartMerge(&merger, pTsdb); - if (code) goto _err; - - for (int32_t iSet = 0; iSet < taosArrayGetSize(merger.fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(merger.fs.aDFileSet, iSet); - if (pSet->nLastF < merger.maxLast) continue; - - code = tsdbMergeFileData(&merger, pSet); - if (code) goto _err; - } - - code = tsdbEndMerge(&merger); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb merge failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} \ No newline at end of file