From 014bfc4ad43ce457574b2246f4d70cce072dec1b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 25 Jun 2023 16:27:14 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 22 +- source/dnode/vnode/src/tsdb/tsdbDataFileRW.h | 10 + source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 321 +++++++++++++------ source/dnode/vnode/src/tsdb/tsdbUpgrade.h | 1 + 4 files changed, 247 insertions(+), 107 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 47d46ed6e2..16559c83ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -15,16 +15,6 @@ #include "tsdbDataFileRW.h" -typedef struct { - SFDataPtr brinBlkPtr[1]; - SFDataPtr rsrvd[2]; -} SHeadFooter; - -typedef struct { - SFDataPtr tombBlkPtr[1]; - SFDataPtr rsrvd[2]; -} STombFooter; - extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, TTombBlkArray *tombBlkArray, uint8_t **bufArr); extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, @@ -1144,15 +1134,19 @@ _exit: return code; } +int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer) { + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); + if (code) return code; + *fileSize += sizeof(*footer); + return 0; +} + static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t ftype = TSDB_FTYPE_HEAD; - code = tsdbWriteFile(writer->fd[ftype], writer->files[ftype].size, (const uint8_t *)writer->headFooter, - sizeof(SHeadFooter)); + code = tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size, writer->headFooter); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[ftype].size += sizeof(SHeadFooter); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h index bd89cd7e2d..827b58fb4a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h @@ -29,6 +29,16 @@ typedef TARRAY2(SBlockIdx) TBlockIdxArray; typedef TARRAY2(SDataBlk) TDataBlkArray; typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray; +typedef struct { + SFDataPtr brinBlkPtr[1]; + SFDataPtr rsrvd[2]; +} SHeadFooter; + +typedef struct { + SFDataPtr tombBlkPtr[1]; + SFDataPtr rsrvd[2]; +} STombFooter; + // SDataFileReader ============================================= typedef struct SDataFileReader SDataFileReader; typedef struct SDataFileReaderConfig { diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index de78426da1..83db3717ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -16,106 +16,231 @@ #include "tsdbUpgrade.h" // old -extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); +extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); +extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData); // new extern int32_t save_fs(const TFileSetArray *arr, const char *fname); extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); +extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, + TBrinBlkArray *brinBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); +extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); + +static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + struct { + // config + int32_t maxRow; + int8_t cmprAlg; + int32_t szPage; + uint8_t *bufArr[8]; + // reader + SArray *aBlockIdx; + SMapData mDataBlk[1]; + SBlockData blockData[1]; + // writer + STsdbFD *fd; + SBrinBlock brinBlock[1]; + TBrinBlkArray brinBlkArray[1]; + SHeadFooter footer[1]; + } ctx[1] = {{ + .maxRow = tsdb->pVnode->config.tsdbCfg.maxRows, + .cmprAlg = tsdb->pVnode->config.tsdbCfg.compression, + .szPage = tsdb->pVnode->config.tsdbPageSize, + }}; + + if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadBlockIdx(reader, ctx->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(ctx->aBlockIdx) == 0) { + goto _exit; + } else { + STFile file = { + .type = TSDB_FTYPE_HEAD, + .did = pDFileSet->diskId, + .fid = fset->fid, + .cid = pDFileSet->pHeadF->commitID, + .size = pDFileSet->pHeadF->size, + }; + + code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_HEAD]); + TSDB_CHECK_CODE(code, lino, _exit); + + // open fd + char fname[TSDB_FILENAME_LEN]; + tsdbTFileName(tsdb, &file, fname); + + code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) { + SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx); + + code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) { + SDataBlk dataBlk[1]; + tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk); + + SBrinRecord record = { + .suid = pBlockIdx->suid, + .uid = pBlockIdx->uid, + .firstKey = dataBlk->minKey.ts, + .firstKeyVer = dataBlk->minKey.version, + .lastKey = dataBlk->maxKey.ts, + .lastKeyVer = dataBlk->maxKey.version, + .minVer = dataBlk->minVer, + .maxVer = dataBlk->maxVer, + .blockOffset = dataBlk->aSubBlock->offset, + .smaOffset = dataBlk->smaInfo.offset, + .blockSize = dataBlk->aSubBlock->szBlock, + .blockKeySize = dataBlk->aSubBlock->szKey, + .smaSize = dataBlk->smaInfo.size, + .numRow = dataBlk->nRow, + .count = dataBlk->nRow, + }; + + if (dataBlk->hasDup) { + code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + + record.count = 1; + for (int32_t i = 1; i < ctx->blockData->nRow; ++i) { + if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) { + record.count++; + } + } + } + + code = tBrinBlockPut(ctx->brinBlock, &record); + TSDB_CHECK_CODE(code, lino, _exit); + + if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) { + code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, + ctx->brinBlkArray, ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + + if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) { + code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, + ctx->brinBlkArray, ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = + tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr, &fset->farr[TSDB_FTYPE_HEAD]->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFsyncFile(ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit); + + tsdbCloseFile(&ctx->fd); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + TARRAY2_DESTROY(ctx->brinBlkArray, NULL); + tBrinBlockDestroy(ctx->brinBlock); + tBlockDataDestroy(ctx->blockData); + tMapDataClear(ctx->mDataBlk); + taosArrayDestroy(ctx->aBlockIdx); + for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); ++i) { + tFree(ctx->bufArr[i]); + } + return code; +} + +static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) { int32_t code = 0; int32_t lino = 0; SDataFReader *reader; + STFileSet *fset; + + code = tsdbTFileSetInit(pDFileSet->fid, &fset); + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet); TSDB_CHECK_CODE(code, lino, _exit); // .head - { - SArray *aBlockIdx = NULL; - SMapData mDataBlk[1] = {0}; - SBrinBlock brinBlock[1] = {0}; - TBrinBlkArray brinBlkArray[1] = {0}; - - if ((aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbReadBlockIdx(reader, aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { - SBlockIdx *pBlockIdx = taosArrayGet(aBlockIdx, i); - - code = tsdbReadDataBlk(reader, pBlockIdx, mDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t j = 0; j < mDataBlk->nItem; ++j) { - SDataBlk dataBlk[1]; - - tMapDataGetItemByIdx(mDataBlk, j, dataBlk, tGetDataBlk); - - SBrinRecord record = { - .suid = pBlockIdx->suid, - .uid = pBlockIdx->uid, - .firstKey = dataBlk->minKey.ts, - .firstKeyVer = dataBlk->minKey.version, - .lastKey = dataBlk->maxKey.ts, - .lastKeyVer = dataBlk->maxKey.version, - .minVer = dataBlk->minVer, - .maxVer = dataBlk->maxVer, - .blockOffset = dataBlk->aSubBlock->offset, - .smaOffset = dataBlk->smaInfo.offset, - .blockSize = dataBlk->aSubBlock->szBlock, - .blockKeySize = dataBlk->aSubBlock->szKey, - .smaSize = dataBlk->smaInfo.size, - .numRow = dataBlk->nRow, - .count = dataBlk->nRow, - }; - - if (dataBlk->hasDup) { - ASSERT(0); - // TODO: need to get count - // record.count = 0; - } - - code = tBrinBlockPut(brinBlock, &record); - TSDB_CHECK_CODE(code, lino, _exit); - - if (BRIN_BLOCK_SIZE(brinBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { - // TODO - tBrinBlockClear(brinBlock); - } - } - } - - if (BRIN_BLOCK_SIZE(brinBlock) > 0) { - // TODO - ASSERT(0); - } - - // TODO - ASSERT(0); - - TARRAY2_DESTROY(brinBlkArray, NULL); - tBrinBlockDestroy(brinBlock); - taosArrayDestroy(aBlockIdx); - tMapDataClear(mDataBlk); - } + code = tsdbUpgradeHead(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); // .data + code = tsdbUpgradeData(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); // .sma + code = tsdbUpgradeSma(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); // .stt - for (int32_t i = 0; i < pDFileSet->nSttF; ++i) { - // TODO + if (pDFileSet->nSttF > 0) { + code = tsdbUpgradeStt(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); } tsdbDataFReaderClose(&reader); + code = TARRAY2_APPEND(fileSetArray, fset); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); @@ -258,7 +383,30 @@ _exit: return code; } -static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { +static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, TFileSetArray *fileSetArray) { + int32_t code = 0; + int32_t lino = 0; + + // upgrade each file set + for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { + code = tsdbUpgradeFileSet(tsdb, taosArrayGet(tsdb->fs.aDFileSet, i), fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // upgrade tomb file + if (tsdb->fs.pDelFile != NULL) { + code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { int32_t code = 0; int32_t lino = 0; @@ -268,19 +416,8 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { code = tsdbFSOpen(tsdb, rollback); TSDB_CHECK_CODE(code, lino, _exit); - // upgrade each file set - for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { - SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i); - - code = tsdbUpgradeFileSet(tsdb, pDFileSet, fileSetArray); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // upgrade tomb file - if (tsdb->fs.pDelFile != NULL) { - code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbDoUpgradeFileSystem(tsdb, fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); // close file system code = tsdbFSClose(tsdb); @@ -292,13 +429,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { code = save_fs(fileSetArray, fname); TSDB_CHECK_CODE(code, lino, _exit); - // clear - TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } + TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); return code; } @@ -308,7 +443,7 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { tsdbGetCurrentFName(tsdb, fname, NULL); if (!taosCheckExistFile(fname)) return 0; - int32_t code = tsdbDoUpgradeFileSystem(tsdb, rollback); + int32_t code = tsdbUpgradeFileSystem(tsdb, rollback); if (code) return code; taosRemoveFile(fname); diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.h b/source/dnode/vnode/src/tsdb/tsdbUpgrade.h index 4dec009613..f9aac94e00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.h +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.h @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#include "tsdbDataFileRW.h" #include "tsdbDef.h" #include "tsdbFS2.h" #include "tsdbUtil2.h"