From 039e4a0505ccb6835c4227534099a70c3fade9af Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 26 Aug 2022 10:44:02 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbMerge.c | 77 ++++++++++++++++++++----- 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 186db38461..19d2cd45f3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -22,11 +22,13 @@ typedef struct { } SRowInfo; typedef struct { - SRowInfo rowInfo; - SArray *aBlockL; // SArray - int32_t iBlockL; - SBlockData bData; - int32_t iRow; + SRowInfo rowInfo; + SDataFReader *pReader; + int32_t iLast; + SArray *aBlockL; // SArray + int32_t iBlockL; + SBlockData bData; + int32_t iRow; } SLDataIter; typedef struct { @@ -64,6 +66,9 @@ static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { } } +extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, + SBlockData *pBlockData); // todo + static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { int32_t code = 0; @@ -78,7 +83,8 @@ static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { } else { pIter->iBlockL++; if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { - // code = tsdbReadLastBlock(NULL, (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL), &pIter->bData); + SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pBlockL, &pIter->bData); if (code) goto _exit; pIter->iRow = 0; @@ -129,7 +135,7 @@ typedef struct { struct { SDataFReader *pReader; SArray *aBlockIdx; - SLDataIter aLDataiter[TSDB_MAX_LAST_FILE]; + SLDataIter *aLDataiter[TSDB_MAX_LAST_FILE]; SDataMerger merger; } dReader; struct { @@ -141,9 +147,6 @@ typedef struct { } dWriter; } STsdbMerger; -extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, - SBlockData *pBlockData); // todo - static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { int32_t code = 0; STsdb *pTsdb = pMerger->pTsdb; @@ -165,6 +168,8 @@ static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { } SLDataIter *pIter = (SLDataIter *)pNode->payload; + pIter->pReader = pMerger->dReader.pReader; + pIter->iLast = iLast; pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); if (pIter->aBlockL == NULL) { @@ -191,6 +196,8 @@ static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); ASSERT(pNode); + + pMerger->dReader.aLDataiter[iLast] = pIter; } // writer @@ -251,6 +258,48 @@ _err: return code; } +typedef struct { + int64_t suid; + int64_t uid; + TSKEY ts; + int64_t version; +} SRInfo; + +static int32_t tRInfoCmprFn(const void *p1, const void *p2) { + SRInfo *pInfo1 = (SRInfo *)p1; + SRInfo *pInfo2 = (SRInfo *)p2; + + // suid + if (pInfo1->suid < pInfo2->suid) { + return -1; + } else if (pInfo1->suid > pInfo2->suid) { + return 1; + } + + // uid + if (pInfo1->uid < pInfo2->uid) { + return -1; + } else if (pInfo1->uid > pInfo2->uid) { + return 1; + } + + // ts + if (pInfo1->ts < pInfo2->ts) { + return -1; + } else if (pInfo1->ts > pInfo2->ts) { + return 1; + } + + // version + if (pInfo1->version < pInfo2->version) { + return -1; + } else if (pInfo1->version > pInfo2->version) { + return 1; + } + + return 0; +} + static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { int32_t code = 0; STsdb *pTsdb = pMerger->pTsdb; @@ -260,9 +309,9 @@ static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { if (code) goto _err; // impl - SRowInfo rInfo = {.suid = INT64_MIN}; SRowInfo *pInfo; int64_t nRow = 0; + SRInfo rInfo = {.suid = INT64_MIN}; while (true) { code = tDataMergeNext(&pMerger->dReader.merger, &pInfo); if (code) goto _err; @@ -270,8 +319,10 @@ static int32_t tsdbMergeFileData(STsdbMerger *pMerger, SDFileSet *pSet) { if (pInfo == NULL) break; nRow++; - ASSERT(tRowInfoCmprFn(pInfo, &rInfo) > 0); - rInfo = *pInfo; + SRInfo rInfoT = { + .suid = pInfo->suid, .uid = pInfo->uid, .ts = TSDBROW_TS(&pInfo->row), .version = TSDBROW_VERSION(&pInfo->row)}; + ASSERT(tRInfoCmprFn(&rInfoT, &rInfo) > 0); + rInfo = rInfoT; } // end