From 2d0882edba25b2e87491a0be4fe2f21df3536382 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 2 Aug 2022 09:06:18 +0000 Subject: [PATCH] more last file refact --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 226 +++++++++++++++++------ 1 file changed, 165 insertions(+), 61 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 3900adf205..2f57992280 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -20,6 +20,12 @@ typedef struct { STSchema *pTSchema; } SSkmInfo; +typedef struct { + int64_t suid; + int64_t uid; + TSDBROW row; +} SRowInfo; + typedef struct { STsdb *pTsdb; /* commit data */ @@ -38,11 +44,20 @@ typedef struct { // commit file data struct { SDataFReader *pReader; - SArray *aBlockIdx; // SArray - SArray *aBlockL; // SArray - SMapData mBlock; // SMapData, read from reader - SBlockData bData; - SBlockData bDatal; + // data + SArray *aBlockIdx; // SArray + int32_t iBlockIdx; + SBlockIdx *pBlockIdx; + SMapData mBlock; // SMapData + SBlockData bData; + // last + SArray *aBlockL; // SArray + int32_t iBlockL; + SBlockL *pBlockL; + SBlockData bDatal; + int32_t iRow; + SRowInfo *pRow; + SRowInfo row; } dReader; struct { SDataFWriter *pWriter; @@ -290,20 +305,46 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); if (code) goto _err; + // data code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL); if (code) goto _err; + pCommitter->dReader.iBlockIdx = 0; + if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { + pCommitter->dReader.pBlockIdx = + (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); + + code = + tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL); + if (code) goto _err; + } else { + pCommitter->dReader.pBlockIdx = NULL; + } + + // last code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL); if (code) goto _err; + pCommitter->dReader.iBlockL = 0; + if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { + pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); + + // TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL, + // NULL); + if (code) goto _err; + + pCommitter->dReader.iRow = 0; + pCommitter->dReader.pRow = &pCommitter->dReader.row; + pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid; + pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow]; + pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + } else { + pCommitter->dReader.pRow = NULL; + } } else { - pCommitter->dReader.pReader = NULL; - taosArrayClear(pCommitter->dReader.aBlockIdx); - taosArrayClear(pCommitter->dReader.aBlockL); + pCommitter->dReader.pBlockIdx = NULL; + pCommitter->dReader.pRow = NULL; } - tMapDataReset(&pCommitter->dReader.mBlock); - tBlockDataReset(&pCommitter->dReader.bData); - tBlockDataReset(&pCommitter->dReader.bDatal); // Writer SHeadFile fHead; @@ -707,7 +748,7 @@ _err: return code; } -static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { +static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { int32_t code = 0; STbDataIter iter = {0}; STbDataIter *pIter = &iter; @@ -716,6 +757,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl int32_t nBlock; int64_t suid; int64_t uid; + SBlockIdx *pBlockIdx = NULL; if (pTbData) { tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); @@ -908,6 +950,95 @@ _err: return code; } +static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { + int32_t code = 0; + + // data + while (true) { + if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break; + + SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; + code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, NULL, &blockIdx); + if (code) goto _err; + + if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + pCommitter->dReader.iBlockIdx++; + if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { + pCommitter->dReader.pBlockIdx = + (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); + + code = + tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL); + if (code) goto _err; + } else { + pCommitter->dReader.pBlockIdx = NULL; + } + } + + // last + SBlockL blockL; + while (true) { + if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break; + + // check if same suid + if (0) { + goto _write_block_data; + } + + // append + code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRow->row, NULL); + if (code) goto _err; + + // next + pCommitter->dReader.iRow++; + if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { + pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow]; + pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + } else { + pCommitter->dReader.iBlockL++; + if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { + pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); + + // TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL, + // NULL); + if (code) goto _err; + + pCommitter->dReader.iRow = 0; + pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid; + pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow]; + pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + } else { + pCommitter->dReader.pRow = NULL; + } + } + + // write + if (pCommitter->dWriter.bDatal.nRow < pCommitter->maxRow) continue; + + _write_block_data: + code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL, + pCommitter->cmprAlg); // todo + if (code) goto _err; + + if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + tBlockDataClearData(&pCommitter->dWriter.bDatal); + } + + return code; + +_err: + tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + static int32_t tsdbCommitFileData(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -918,59 +1049,32 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { if (code) goto _err; // commit file data impl - int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); - int32_t iBlockIdx = 0; - int32_t nBlockIdx = taosArrayGetSize(pCommitter->dReader.aBlockIdx); - STbData *pTbData; - SBlockIdx *pBlockIdx; + for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pMemTable->aTbData); iTbData++) { + STbData *pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - ASSERT(nTbData > 0); + // move commit until current (suid, uid) + code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid}); + if (code) goto _err; - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL; - while (pTbData || pBlockIdx) { - if (pTbData && pBlockIdx) { - int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx); + // commit current table data commit + code = tsdbCommitTableData(pCommitter, pTbData); + if (code) goto _err; + } - if (c == 0) { - goto _commit_table_mem_and_disk; - } else if (c < 0) { - goto _commit_table_mem_data; - } else { - goto _commit_table_disk_data; - } - } else if (pBlockIdx) { - goto _commit_table_disk_data; - } else { - goto _commit_table_mem_data; + code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}); + if (code) goto _err; + + if (pCommitter->dWriter.bDatal.nRow > 0) { + SBlockL blockL; + + code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL, + pCommitter->cmprAlg); + if (code) goto _err; + + if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } - - _commit_table_mem_data: - code = tsdbCommitTableData(pCommitter, pTbData, NULL); - if (code) goto _err; - - iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; - continue; - - _commit_table_disk_data: - code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx); - if (code) goto _err; - - iBlockIdx++; - pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL; - continue; - - _commit_table_mem_and_disk: - code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx); - if (code) goto _err; - - iBlockIdx++; - pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL; - iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; - continue; } // commit file data end