diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 26e691b249..4b43871096 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -20,6 +20,32 @@ typedef struct { STSchema *pTSchema; } SSkmInfo; +typedef struct { + int64_t suid; + int64_t uid; + TSDBROW row; +} SRowInfo; + +typedef struct { + SRBTreeNode n; + SRowInfo r; + int8_t type; + union { + struct { + SArray *aTbDataP; + int32_t iTbDataP; + STbDataIter iter; + }; // memory data iter + struct { + int32_t iLast; + SArray *aBlockL; + int32_t iBlockL; + SBlockData bData; + int32_t iRow; + }; // last file data iter + }; +} SDataIter; + typedef struct { STsdb *pTsdb; int8_t toMerge; @@ -47,6 +73,10 @@ typedef struct { SMapData mBlock; // SMapData SBlockData bData; } dReader; + struct { + SDataIter *pIter; + SRBTree rbt; + }; struct { SDataFWriter *pWriter; SArray *aBlockIdx; // SArray @@ -65,6 +95,9 @@ typedef struct { SArray *aDelData; // SArray } SCommitter; +extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, + SBlockData *pBlockData); // todo + static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbCommitData(SCommitter *pCommitter); static int32_t tsdbCommitDel(SCommitter *pCommitter); @@ -1289,28 +1322,7 @@ _err: return code; } -// Merger ===================================================================== -typedef struct { - int64_t suid; - int64_t uid; - TSDBROW row; -} SRowInfo; - -typedef struct { - SRowInfo rowInfo; - SDataFReader *pReader; - int32_t iLast; - SArray *aBlockL; // SArray - int32_t iBlockL; - SBlockData bData; - int32_t iRow; -} SLDataIter; - -typedef struct { - SRBTreeNode *pNode; - SRBTree rbt; -} SDataMerger; - +// ================================================================================ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo2 = (SRowInfo *)p2; @@ -1330,6 +1342,165 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); } +static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) { + int32_t code = 0; + + if (pCommitter->pIter) { + SDataIter *pIter = pCommitter->pIter; + if (pCommitter->pIter->type == 0) { // memory + tsdbTbDataIterNext(&pIter->iter); + TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); + while (true) { + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); + pRow = NULL; + } + + if (pRow) { + pIter->r.suid = pIter->iter.pTbData->suid; + pIter->r.uid = pIter->iter.pTbData->uid; + pIter->r.row = *pRow; + break; + } + + pIter->iTbDataP++; + if (pIter->iTbDataP < taosArrayGetSize(pIter->aTbDataP)) { + STbData *pTbData = (STbData *)taosArrayGetP(pIter->aTbDataP, pIter->iTbDataP); + TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN}; + tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter); + pRow = tsdbTbDataIterGet(&pIter->iter); + continue; + } else { + pCommitter->pIter = NULL; + break; + } + } + } else if (pCommitter->pIter->type == 1) { // last file + pIter->iRow++; + if (pIter->iRow < pIter->bData.nRow) { + pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; + pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + } else { + pIter->iBlockL++; + if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { + SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + + code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, pIter->iLast, pBlockL, &pIter->bData); + if (code) goto _exit; + + pIter->iRow = 0; + pIter->r.suid = pIter->bData.suid; + pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; + pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); + } else { + pCommitter->pIter = NULL; + } + } + } else { + ASSERT(0); + } + + // compare with min in RB Tree + pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt); + if (pCommitter->pIter && pIter) { + int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r); + if (c > 0) { + tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter); + pCommitter->pIter = NULL; + } else { + ASSERT(c); + } + } + } + + if (pCommitter->pIter == NULL) { + pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt); + if (pCommitter->pIter) { + tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter); + } + } + + if (pCommitter->pIter) { + *ppInfo = &pCommitter->pIter->r; + } else { + *ppInfo = NULL; + } + +_exit: + return code; +} + +static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { + int32_t code = 0; + + SRowInfo *pRowInfo = NULL; + TABLEID id = {0}; + while (true) { + code = tsdbNextCommitRow(pCommitter, &pRowInfo); + if (code) goto _err; + + if (pRowInfo == NULL) { + // end the commit (todo) + break; + } + + if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) { + // table changed, end current table commit (todo) + + // prepare the new + id.suid = pRowInfo->suid; + id.uid = pRowInfo->uid; + } + + SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; + if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { + while (true) { + /* code */ + } + } + + if (pRowInfo->row.type == 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, pRowInfo->suid, pRowInfo->uid, TSDBROW_SVERSION(&pRowInfo->row)); + if (code) goto _err; + } + + code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, pCommitter->skmRow.pTSchema, pRowInfo->uid); + if (code) goto _err; + + if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) { + if (1 /*toLastOnly*/) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } else { + code = tsdbCommitDataBlock(pCommitter, NULL); + if (code) goto _err; + } + } + } + + return code; + +_err: + tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +// ================================================================================ +typedef struct { + SRowInfo rowInfo; + SDataFReader *pReader; + int32_t iLast; + SArray *aBlockL; // SArray + int32_t iBlockL; + SBlockData bData; + int32_t iRow; +} SLDataIter; + +typedef struct { + SRBTreeNode *pNode; + SRBTree rbt; +} SDataMerger; + static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { pMerger->pNode = NULL; tRBTreeCreate(&pMerger->rbt, tRowInfoCmprFn); @@ -1341,9 +1512,6 @@ static void tDataMergerInit(SDataMerger *pMerger, SArray *aNodeP) { } } -extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, - SBlockData *pBlockData); // todo - static int32_t tDataMergeNext(SDataMerger *pMerger, SRowInfo **ppInfo) { int32_t code = 0; @@ -1401,7 +1569,6 @@ _exit: return code; } -// ================================================================================ typedef struct { STsdb *pTsdb; int8_t maxLast;