From f24317fbfff1fefc930d91db5a222e31f2000a76 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 3 Aug 2022 01:45:56 +0000 Subject: [PATCH] more last file refact --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 211 ++++++++++++++++++----- 1 file changed, 171 insertions(+), 40 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 2f57992280..704aa0aab0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -569,7 +569,8 @@ _err: } static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { - int32_t code = 0; + int32_t code = 0; +#if 0 TSDBROW *pRow; SBlock block; SBlock *pBlock = █ @@ -623,6 +624,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tBlockDataClearData(pBlockData); } +#endif return code; _err: @@ -653,25 +655,25 @@ _err: return code; } -static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { - int32_t code = 0; - SBlockIdx blockIdx = {.suid = suid, .uid = uid}; - SBlockIdx *pBlockIdx = &blockIdx; +// static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { +// int32_t code = 0; +// SBlockIdx blockIdx = {.suid = suid, .uid = uid}; +// SBlockIdx *pBlockIdx = &blockIdx; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx); - if (code) goto _err; +// code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx); +// if (code) goto _err; - if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } +// if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// goto _err; +// } - return code; +// return code; -_err: - tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} +// _err: +// tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +// return code; +// } static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) { int32_t nRow = 0; @@ -748,6 +750,145 @@ _err: return code; } +static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { + int32_t code = 0; + + ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0); + ASSERT(pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, pTbData) >= 0); + + // end last if need + if (pTbData->suid == 0 || pTbData->suid != 0 /*todo*/) { + if (pCommitter->dWriter.bDatal.nRow > 0) { + // TODO: code = tsdbCommitBlockDataL(pCommitter); + if (code) goto _err; + } + } + + // merge commit table data + STbDataIter iter = {0}; + STbDataIter *pIter = &iter; + TSDBROW *pRow; + + tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + + if (pRow == NULL) goto _exit; + + SBlockIdx *pBlockIdx = NULL; + int32_t iBlock = 0; + SBlock block; + SBlock *pBlock = █ + + if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { + pBlockIdx = pCommitter->dReader.pBlockIdx; + } + + if (pBlockIdx && iBlock < pCommitter->dReader.mBlock.nItem) { + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + + tMapDataReset(&pCommitter->dWriter.mBlock); + while (pBlock && pRow) { + int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); + if (c < 0) { // disk + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + if (code) goto _err; + + // next + iBlock++; + if (iBlock < pCommitter->dReader.mBlock.nItem) { + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + } else if (c < 0) { // memory + code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); + if (code) goto _err; + + // next + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + } else { // merge + int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock); + + ASSERT(nOvlp > 0); + + if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { + code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); + if (code) goto _err; + } else { + // code = tsdbMergeTableData(pCommitter, pIter, pBlock, NULL, 1); + if (code) goto _err; + } + + // next + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + iBlock++; + if (iBlock < pCommitter->dReader.mBlock.nItem) { + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + } + } + + while (pBlock) { + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + if (code) goto _err; + + iBlock++; + if (iBlock < pCommitter->dReader.mBlock.nItem) { + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; + } + } + + if (pRow) { + code = + tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + if (code) goto _err; + + pRow = tsdbTbDataIterGet(pIter); + ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey); + } + + // end + if (pCommitter->dWriter.mBlock.nItem > 0) { + SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid}; + code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, &blockIdx); + if (code) goto _err; + + if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + +_exit: + pRow = tsdbTbDataIterGet(pIter); + if (pRow) { + pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); + } + + return code; + +_err: + tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +#if 0 static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { int32_t code = 0; STbDataIter iter = {0}; @@ -913,6 +1054,7 @@ _err: tsdbError("vgId:%d, tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } +#endif static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; @@ -985,8 +1127,14 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break; // check if same suid - if (0) { - goto _write_block_data; + if (pCommitter->dReader.pRow->suid == 0) { + if (pCommitter->dReader.pRow->uid != 0 /*todo*/) { + // code = tsdbCommitBlockDataL(pCommitter); + if (code) goto _err; + } + } else if (pCommitter->dReader.pRow->suid != 0 /*todo*/) { + // code = tsdbCommitBlockDataL(pCommitter); + if (code) goto _err; } // append @@ -1017,19 +1165,10 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { } // write - if (pCommitter->dWriter.bDatal.nRow < pCommitter->maxRow) continue; - - _write_block_data: - code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL, - pCommitter->cmprAlg); // todo - if (code) goto _err; - - if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { + // code = tsdbCommitBlockDataL(pCommitter); + if (code) goto _err; } - - tBlockDataClearData(&pCommitter->dWriter.bDatal); } return code; @@ -1056,7 +1195,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid}); if (code) goto _err; - // commit current table data commit + // commit current table data code = tsdbCommitTableData(pCommitter, pTbData); if (code) goto _err; } @@ -1065,16 +1204,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { if (code) goto _err; if (pCommitter->dWriter.bDatal.nRow > 0) { - SBlockL blockL; - - code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL, - pCommitter->cmprAlg); + // code = tsdbCommitBlockDataL(pCommitter); if (code) goto _err; - - if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } } // commit file data end