From e0b0ce23951d1aadf94ba409424ecb3aaece4bd3 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 29 Jan 2023 10:21:39 +0800 Subject: [PATCH] refact more code --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 107 +++++++++++++++++---- 1 file changed, 86 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index b14fbdf189..befaf4a3e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -29,12 +29,19 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, typedef struct STsdbDataIter2 STsdbDataIter2; typedef struct STsdbFilterInfo STsdbFilterInfo; +typedef struct { + int64_t suid; + int64_t uid; + SDelData delData; +} SDelInfo; + struct STsdbDataIter2 { STsdbDataIter2* next; SRBTreeNode rbtn; int32_t type; SRowInfo rowInfo; + SDelInfo delInfo; union { // TSDB_MEM_TABLE_DATA_ITER struct { @@ -391,6 +398,54 @@ _exit: return code; } +static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + int32_t lino = 0; + + for (;;) { + while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) { + SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData); + + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) { + pIter->tIter.iDelData++; + continue; + } + } + } + + pIter->delInfo.delData = *pDelData; + pIter->tIter.iDelData++; + goto _exit; + } + + for (;;) { + if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) { + SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx); + + code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->delInfo.suid = pDelIdx->suid; + pIter->delInfo.uid = pDelIdx->uid; + pIter->tIter.iDelData = 0; + pIter->tIter.iDelIdx++; + break; + } else { + pIter->delInfo = (SDelInfo){0}; + goto _exit; + } + } + } + +_exit: + if (code) { + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { int32_t code = 0; @@ -401,6 +456,8 @@ static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilter return tsdbDataFileDataIterNext(pIter, pFilterInfo); } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { return tsdbSttFileDataIterNext(pIter, pFilterInfo); + } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { + return tsdbTombFileDataIterNext(pIter, pFilterInfo); } else { ASSERT(0); return code; @@ -725,23 +782,27 @@ _exit: return code; } -static int32_t tsdbSnapReadGetTombData(STsdbSnapReader* pReader, void* pDelInfo) { - int32_t code = 0; - int32_t lino = 0; - - ASSERT(0); - // TODO -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); +static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { + if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) { + *ppDelInfo = NULL; + } else { + *ppDelInfo = &pReader->pTIter->delInfo; } - return code; } -static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, void* pDelInfo) { +static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { int32_t code = 0; int32_t lino = 0; - // TODO + + code = tsdbDataIterNext2( + pReader->pTIter, + &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, .sver = pReader->sver, .ever = pReader->ever}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (ppDelInfo) { + tsdbSnapReadGetTombData(pReader, ppDelInfo); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -755,6 +816,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) STsdb* pTsdb = pReader->pTsdb; + // open tombstone data iter if need if (pReader->pDelFReader == NULL) { if (pReader->fs.pDelFile == NULL) goto _exit; @@ -764,15 +826,16 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter); TSDB_CHECK_CODE(code, lino, _exit); + + if (pReader->pTIter) { + code = tsdbSnapReadNextTombData(pReader, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + } } - struct { - int64_t suid; - int64_t uid; - SDelData dData; - }* pDelInfo; - code = tsdbSnapReadGetTombData(pReader, &pDelInfo); - TSDB_CHECK_CODE(code, lino, _exit); + // loop to get tombstone data + SDelInfo* pDelInfo; + tsdbSnapReadGetTombData(pReader, &pDelInfo); if (pDelInfo == NULL) goto _exit; @@ -786,7 +849,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) } while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) { - if (taosArrayPush(pReader->aDelData, &pDelInfo->dData) < 0) { + if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) < 0) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } @@ -795,6 +858,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) TSDB_CHECK_CODE(code, lino, _exit); } + // encode tombstone data if (taosArrayGetSize(pReader->aDelData) > 0) { code = tsdbSnapCmprTombData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); @@ -870,9 +934,9 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { if (pReader->pDelFReader) { tsdbDelFReaderClose(&pReader->pDelFReader); } + taosArrayDestroy(pReader->aDelData); // timeseries - tBlockDataDestroy(&pReader->bData, 1); while (pReader->iterList) { STsdbDataIter2* pIter = pReader->iterList; pReader->iterList = pIter->next; @@ -881,6 +945,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { if (pReader->pDataFReader) { tsdbDataFReaderClose(&pReader->pDataFReader); } + tBlockDataDestroy(&pReader->bData, 1); // other tDestroyTSchema(pReader->skmTable.pTSchema);