From f43bae0df064d97cf68ff8e1974cc61eab72c4c2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 23 Aug 2022 17:11:17 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 128 +++++++++++------------ 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 08688b442b..4a31efdc73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -327,53 +327,53 @@ _exit: return code; } -static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { - int32_t code = 0; +// static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { +// int32_t code = 0; - ASSERT(pCommitter->dReader.pReader); - ASSERT(pCommitter->dReader.pRowInfo); +// ASSERT(pCommitter->dReader.pReader); +// ASSERT(pCommitter->dReader.pRowInfo); - SBlockData *pBlockDatal = &pCommitter->dReader.bDatal; - pCommitter->dReader.iRow++; - if (pCommitter->dReader.iRow < pBlockDatal->nRow) { - if (pBlockDatal->uid) { - pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; - } else { - pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow]; - } - pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); - } else { - pCommitter->dReader.iBlockL++; - if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { - SBlockL *pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); - int64_t suid = pBlockL->suid; - int64_t uid = pBlockL->maxUid; +// SBlockData *pBlockDatal = &pCommitter->dReader.bDatal; +// pCommitter->dReader.iRow++; +// if (pCommitter->dReader.iRow < pBlockDatal->nRow) { +// if (pBlockDatal->uid) { +// pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; +// } else { +// pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow]; +// } +// pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); +// } else { +// pCommitter->dReader.iBlockL++; +// if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { +// SBlockL *pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); +// int64_t suid = pBlockL->suid; +// int64_t uid = pBlockL->maxUid; - code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid); - if (code) goto _exit; +// code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid); +// if (code) goto _exit; - code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema); - if (code) goto _exit; +// code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema); +// if (code) goto _exit; - code = tsdbReadLastBlock(pCommitter->dReader.pReader, pBlockL, pBlockDatal); - if (code) goto _exit; +// code = tsdbReadLastBlock(pCommitter->dReader.pReader, pBlockL, pBlockDatal); +// if (code) goto _exit; - pCommitter->dReader.iRow = 0; - pCommitter->dReader.pRowInfo->suid = pBlockDatal->suid; - if (pBlockDatal->uid) { - pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; - } else { - pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[0]; - } - pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); - } else { - pCommitter->dReader.pRowInfo = NULL; - } - } +// pCommitter->dReader.iRow = 0; +// pCommitter->dReader.pRowInfo->suid = pBlockDatal->suid; +// if (pBlockDatal->uid) { +// pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; +// } else { +// pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[0]; +// } +// pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); +// } else { +// pCommitter->dReader.pRowInfo = NULL; +// } +// } -_exit: - return code; -} +// _exit: +// return code; +// } static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { int32_t code = 0; @@ -779,7 +779,8 @@ _err: } static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { - int32_t code = 0; + int32_t code = 0; +#if 0 STbData *pTbData = pIter->pTbData; int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); @@ -938,54 +939,47 @@ _exit: _err: tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +#endif return code; } static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) { - int32_t code = 0; + int32_t code = 0; + STbData *pTbData = pIter->pTbData; + int32_t iBlock = 0; + SBlock block; + SBlock *pBlock = █ + TSDBROW *pRow = tsdbTbDataIterGet(pIter); - int32_t iBlock = 0; - SBlock block; - SBlock *pBlock = █ if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } - code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid); - if (code) goto _err; - - tMapDataReset(&pCommitter->dWriter.mBlock); - code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; - code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; - - // .data merge while (pBlock && pRow) { - int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); - if (c < 0) { // disk + SBlock tBlock = {.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}; + int32_t c = tBlockCmprFn(pBlock, &tBlock); + + if (c < 0) { code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); if (code) goto _err; - // next iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } - } else if (c > 0) { // memory + } else if (c > 0) { code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey); if (code) goto _err; - // next pRow = tsdbTbDataIterGet(pIter); if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { pRow = NULL; } - } else { // merge + } else { int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey); ASSERT(nOvlp > 0); @@ -1016,7 +1010,6 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) { code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); if (code) goto _err; - // next iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); @@ -1041,21 +1034,28 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { STbDataIter iter = {0}; TSDBROW *pRow; + tMapDataReset(&pCommitter->dWriter.mBlock); + tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter); pRow = tsdbTbDataIterGet(&iter); if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { pRow = NULL; } - if (pRow == NULL) { if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) { code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock); if (code) goto _err; } - goto _exit; } + code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid); + if (code) goto _err; + code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); + if (code) goto _err; + code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); + if (code) goto _err; + // commit data code = tsdbMergeCommitData(pCommitter, &iter); if (code) goto _err;