From 10e6e5267021ea55c2bae24b629604cc2f8a4282 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 31 May 2023 15:19:28 +0800 Subject: [PATCH] more code --- .../vnode/src/tsdb/dev/inc/tsdbSttFileRW.h | 2 +- source/dnode/vnode/src/tsdb/dev/tsdbIter.c | 5 +- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 414 ++++++++++++------ 3 files changed, 296 insertions(+), 125 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index bfad582fc2..faa63310e9 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -73,10 +73,10 @@ struct SSttFileWriterConfig { int32_t szPage; int8_t cmprAlg; int64_t compactVersion; // compact version + STFile file; SSkmInfo *skmTb; SSkmInfo *skmRow; uint8_t **aBuf; - STFile file; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c index 7d4e6a9624..9b59e3e97c 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c @@ -364,7 +364,10 @@ int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger } int32_t tsdbIterMergerClear(SIterMerger **merger) { - taosMemoryFree(merger[0]); + if (merger[0]) { + taosMemoryFree(merger[0]); + merger[0] = NULL; + } return 0; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index d8c071a073..83efac7ff0 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -25,27 +25,27 @@ typedef struct { int8_t cmprAlg; int64_t compactVersion; int64_t cid; - SSkmInfo skmTb; - SSkmInfo skmRow; - uint8_t *aBuf[5]; + SSkmInfo skmTb[1]; // context struct { - bool opened; - + bool opened; STFileSet *fset; bool toData; int32_t level; + SSttLvl *lvl; + STFileObj *fobj; SRowInfo *row; - SBlockData bData; + SBlockData bData[1]; } ctx[1]; + TFileOpArray fopArr[1]; + // reader TSttFileReaderArray sttReaderArr[1]; - SDataFileReader *dataReader; - TTsdbIterArray iterArr[1]; - SIterMerger *iterMerger; - TFileOpArray fopArr[1]; + // iter + TTsdbIterArray iterArr[1]; + SIterMerger *iterMerger; // writer SSttFileWriter *sttWriter; SDataFileWriter *dataWriter; @@ -63,7 +63,6 @@ static int32_t tsdbMergerOpen(SMerger *merger) { } static int32_t tsdbMergerClose(SMerger *merger) { - // TODO int32_t code = 0; int32_t lino = 0; SVnode *pVnode = merger->tsdb->pVnode; @@ -78,81 +77,94 @@ static int32_t tsdbMergerClose(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); // clear the merge + TARRAY2_FREE(merger->iterArr); + TARRAY2_FREE(merger->sttReaderArr); TARRAY2_FREE(merger->fopArr); + tBlockDataDestroy(merger->ctx->bData); + tDestroyTSchema(merger->skmTb->pTSchema); _exit: if (code) { - } else { + TSDB_ERROR_LOG(vid, lino, code); } return 0; } -static int32_t tsdbMergeNextRow(SMerger *merger) { - // TODO - return 0; -} - -static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) { - if (merger->ctx->bData.nRow == 0) return 0; +static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { + if (merger->ctx->bData->nRow == 0) return 0; int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - if (merger->ctx->bData.nRow >= merger->minRow) { - // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx->bData); - // TSDB_CHECK_CODE(code, lino, _exit); + + if (merger->ctx->bData->nRow < merger->minRow) { + code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData); + TSDB_CHECK_CODE(code, lino, _exit); } else { - code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, &merger->ctx->bData); + code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData); TSDB_CHECK_CODE(code, lino, _exit); } - - tBlockDataReset(&merger->ctx->bData); + tBlockDataClear(merger->ctx->bData); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } + +static int32_t tsdbMergeToDataTableBegin(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + code = tsdbUpdateSkmTb(merger->tsdb, (const TABLEID *)merger->ctx->row, merger->skmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tBlockDataInit(merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb->pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} + static int32_t tsdbMergeToData(SMerger *merger) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - for (;;) { - code = tsdbMergeNextRow(merger); + while ((merger->ctx->row = tsdbIterMergerGet(merger->iterMerger))) { + if (merger->ctx->row->uid != merger->ctx->bData->uid) { + code = tsdbMergeToDataTableEnd(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbMergeToDataTableBegin(merger); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid); TSDB_CHECK_CODE(code, lino, _exit); - if (!merger->ctx->row) { - code = tsdbMergeToDataWriteTSDataBlock(merger); + if (merger->ctx->bData->nRow >= merger->maxRow) { + code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData); TSDB_CHECK_CODE(code, lino, _exit); - break; + + tBlockDataReset(merger->ctx->bData); } - if (!TABLE_SAME_SCHEMA(merger->ctx->bData.suid, merger->ctx->bData.suid, merger->ctx->row->suid, - merger->ctx->row->uid)) { - code = tsdbMergeToDataWriteTSDataBlock(merger); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx->row, &merger->skmTb); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataInit(&merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataAppendRow(&merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid); + code = tsdbIterMergerNext(merger->iterMerger); TSDB_CHECK_CODE(code, lino, _exit); - - if (merger->ctx->bData.nRow >= merger->maxRow) { - code = tsdbMergeToDataWriteTSDataBlock(merger); - TSDB_CHECK_CODE(code, lino, _exit); - } } + code = tsdbMergeToDataTableEnd(merger); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { - tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } @@ -162,13 +174,12 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) { int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - for (;;) { - code = tsdbMergeNextRow(merger); + SRowInfo *row; + while ((row = tsdbIterMergerGet(merger->iterMerger))) { + code = tsdbSttFileWriteTSData(merger->sttWriter, row); TSDB_CHECK_CODE(code, lino, _exit); - if (!merger->ctx->row) break; - - code = tsdbSttFileWriteTSData(merger->sttWriter, merger->ctx->row); + code = tsdbIterMergerNext(merger->iterMerger); TSDB_CHECK_CODE(code, lino, _exit); } @@ -179,97 +190,185 @@ _exit: return code; } -static int32_t tsdbMergeFileSetBegin(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); - STFileSet *fset = merger->ctx->fset; +static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); - // prepare the merger file set - SSttLvl *lvl; - STFileObj *fobj; merger->ctx->toData = true; merger->ctx->level = 0; - - TARRAY2_FOREACH(fset->lvlArr, lvl) { - if (lvl->level != merger->ctx->level) { - lvl = NULL; + TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) { + if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) { + merger->ctx->toData = false; + merger->ctx->lvl = NULL; break; } - fobj = TARRAY2_GET(lvl->fobjArr, 0); - if (fobj->f->stt->nseg < merger->tsdb->pVnode->config.sttTrigger) { + ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1); + + merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr); + if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) { merger->ctx->toData = false; break; } else { - ASSERT(lvl->level == 0 || TARRAY2_SIZE(lvl->fobjArr) == 1); merger->ctx->level++; + // add the operation + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = merger->ctx->fset->fid, + .of = merger->ctx->fobj->f[0], + }; + code = TARRAY2_APPEND(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + // open the reader SSttFileReader *reader; - SSttFileReaderConfig config = { + SSttFileReaderConfig config[1] = {{ .tsdb = merger->tsdb, - // TODO - }; - code = tsdbSttFReaderOpen(fobj->fname, &config, &reader); + .szPage = merger->szPage, + .file[0] = merger->ctx->fobj->f[0], + }}; + code = tsdbSttFReaderOpen(merger->ctx->fobj->fname, config, &reader); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(merger->sttReaderArr, reader); TSDB_CHECK_CODE(code, lino, _exit); + } + } - // add the operation - STFileOp op = { - .fid = fobj->f->fid, - .optype = TSDB_FOP_REMOVE, - .of = fobj->f[0], - }; - code = TARRAY2_APPEND(merger->fopArr, op); +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} + +static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + SSttFileReader *sttReader; + TARRAY2_FOREACH(merger->sttReaderArr, sttReader) { + const TSttSegReaderArray *segReaderArr; + + code = tsdbSttFReaderGetSegReader(sttReader, &segReaderArr); + TSDB_CHECK_CODE(code, lino, _exit); + + SSttSegReader *segReader; + TARRAY2_FOREACH(segReaderArr, segReader) { + STsdbIter *iter; + + STsdbIterConfig config[1] = {{ + .type = TSDB_ITER_TYPE_STT, + .sttReader = segReader, + }}; + + code = tsdbIterOpen(config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(merger->iterArr, iter); TSDB_CHECK_CODE(code, lino, _exit); } } - // open stt file writer - if (lvl) { - SSttFileWriterConfig config = { + code = tsdbIterMergerInit(merger->iterArr, &merger->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} + +static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + SDiskID did = { + .level = 0, + .id = 0, + }; // TODO + + if (merger->ctx->lvl) { // to existing level + SSttFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, .maxRow = merger->maxRow, .szPage = merger->szPage, .cmprAlg = merger->cmprAlg, - .skmTb = &merger->skmTb, - .skmRow = &merger->skmRow, - .aBuf = merger->aBuf, - .file = fobj->f[0], - }; - code = tsdbSttFileWriterOpen(&config, &merger->sttWriter); + .compactVersion = merger->compactVersion, + .file = merger->ctx->fobj->f[0], + }}; + code = tsdbSttFileWriterOpen(config, &merger->sttWriter); TSDB_CHECK_CODE(code, lino, _exit); - } else { - SSttFileWriterConfig config = { + } else { // to new level + SSttFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, .maxRow = merger->maxRow, .szPage = merger->szPage, .cmprAlg = merger->cmprAlg, - .skmTb = &merger->skmTb, - .skmRow = &merger->skmRow, - .aBuf = merger->aBuf, + .compactVersion = merger->compactVersion, .file = - (STFile){ + { .type = TSDB_FTYPE_STT, - .did = {.level = 0, .id = 0}, - .fid = fset->fid, + .did = did, + .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, - .stt = {{.level = merger->ctx->level, .nseg = 0}}, + .stt = {{ + .level = merger->ctx->level, + .nseg = 0, + }}, }, - }; - code = tsdbSttFileWriterOpen(&config, &merger->sttWriter); + }}; + code = tsdbSttFileWriterOpen(config, &merger->sttWriter); TSDB_CHECK_CODE(code, lino, _exit); } - // open data file writer - if (merger->ctx->toData) { + if (merger->ctx->toData) { // TODO + tBlockDataReset(merger->ctx->bData); + SDataFileWriterConfig config = { .tsdb = merger->tsdb, - // TODO + .maxRow = merger->maxRow, + .f = + { + [0] = + { + .type = TSDB_FTYPE_HEAD, + .did = did, + .fid = merger->ctx->fset->fid, + .cid = merger->cid, + .size = 0, + }, + [1] = + { + .type = TSDB_FTYPE_DATA, + .did = did, + .fid = merger->ctx->fset->fid, + .cid = merger->cid, + .size = 0, + }, + [2] = + { + .type = TSDB_FTYPE_SMA, + .did = did, + .fid = merger->ctx->fset->fid, + .cid = merger->cid, + .size = 0, + }, + [3] = + { + .type = TSDB_FTYPE_TOMB, + .did = did, + .fid = merger->ctx->fset->fid, + .cid = merger->cid, + .size = 0, + }, + }, }; code = tsdbDataFileWriterOpen(&config, &merger->dataWriter); TSDB_CHECK_CODE(code, lino, _exit); @@ -277,46 +376,110 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } + +static int32_t tsdbMergeFileSetBegin(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); + ASSERT(TARRAY2_SIZE(merger->iterArr) == 0); + ASSERT(merger->iterMerger == NULL); + ASSERT(merger->sttWriter == NULL); + ASSERT(merger->dataWriter == NULL); + + // open reader + code = tsdbMergeFileSetBeginOpenReader(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + // open iterator + code = tsdbMergeFileSetBeginOpenIter(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + // open writer + code = tsdbMergeFileSetBeginOpenWriter(merger); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} + +static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + STFileOp op[1]; + + if (merger->ctx->toData) { + code = tsdbDataFileWriterClose(&merger->dataWriter, 0, op); + TSDB_CHECK_CODE(code, lino, _exit); + + if (op->optype != TSDB_FOP_NONE) { + code = TARRAY2_APPEND_PTR(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + code = tsdbSttFileWriterClose(&merger->sttWriter, 0, op); + TSDB_CHECK_CODE(code, lino, _exit); + + if (op->optype != TSDB_FOP_NONE) { + code = TARRAY2_APPEND_PTR(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} + +static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) { + tsdbIterMergerClear(&merger->iterMerger); + TARRAY2_CLEAR(merger->iterArr, tsdbIterClose); + return 0; +} + +static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) { + TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFReaderClose); + return 0; +} + static int32_t tsdbMergeFileSetEnd(SMerger *merger) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - STFileOp op; - code = tsdbSttFileWriterClose(&merger->sttWriter, 0, &op); + code = tsdbMergeFileSetEndCloseWriter(merger); TSDB_CHECK_CODE(code, lino, _exit); - if (op.optype != TSDB_FOP_NONE) { - code = TARRAY2_APPEND(merger->fopArr, op); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbMergeFileSetEndCloseIter(merger); + TSDB_CHECK_CODE(code, lino, _exit); - if (merger->ctx->toData) { - // code = tsdbDataFWriterClose(); - // TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbMergeFileSetEndCloseReader(merger); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } + static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; - if (!merger->ctx->opened) { - code = tsdbMergerOpen(merger); - TSDB_CHECK_CODE(code, lino, _exit); - } - merger->ctx->fset = fset; - code = tsdbMergeFileSetBegin(merger); TSDB_CHECK_CODE(code, lino, _exit); @@ -350,12 +513,17 @@ static int32_t tsdbDoMerge(SMerger *merger) { SSttLvl *lvl; STFileObj *fobj; TARRAY2_FOREACH(merger->fsetArr, fset) { - lvl = TARRAY2_SIZE(fset->lvlArr) ? TARRAY2_FIRST(fset->lvlArr) : NULL; + lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL; if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue; fobj = TARRAY2_FIRST(lvl->fobjArr); if (fobj->f->stt->nseg < merger->sttTrigger) continue; + if (!merger->ctx->opened) { + code = tsdbMergerOpen(merger); + TSDB_CHECK_CODE(code, lino, _exit); + } + code = tsdbMergeFileSet(merger, fset); TSDB_CHECK_CODE(code, lino, _exit); }