From e07afeeeafb004352812f63a7a0d07954af6bf0b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 1 Jun 2023 17:05:20 +0800 Subject: [PATCH] more code --- .../vnode/src/tsdb/dev/inc/tsdbDataFileRW.h | 2 + .../dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c | 14 + source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 245 ++++++++++++------ .../dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c | 22 +- 4 files changed, 189 insertions(+), 94 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 4dc5ce67a0..0a9e3076d4 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -54,7 +54,9 @@ typedef struct SDataFileWriterConfig { int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]); +int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row); int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); +int32_t tsdbDataFileFLushTSDataBlock(SDataFileWriter *writer); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index 8cd191927d..91e766eae8 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -440,6 +440,8 @@ _exit: return code; } int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) { + if (bData->nRow == 0) return 0; + int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(writer->config->tsdb->pVnode); @@ -468,3 +470,15 @@ _exit: } return code; } + +int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) { + // TODO + ASSERT(0); + return 0; +} + +int32_t tsdbDataFileFLushTSDataBlock(SDataFileWriter *writer) { + // TODO + ASSERT(0); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 73dd5465fc..9f20b92a59 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -18,25 +18,29 @@ typedef struct { STsdb *tsdb; TFileSetArray *fsetArr; - int32_t sttTrigger; - int32_t maxRow; - int32_t minRow; - int32_t szPage; - int8_t cmprAlg; - int64_t compactVersion; - int64_t cid; - SSkmInfo skmTb[1]; + + int32_t sttTrigger; + int32_t maxRow; + int32_t minRow; + int32_t szPage; + int8_t cmprAlg; + int64_t compactVersion; + int64_t cid; + SSkmInfo skmTb[1]; + SSkmInfo skmRow[1]; // context struct { bool opened; + int64_t now; STFileSet *fset; bool toData; int32_t level; SSttLvl *lvl; STFileObj *fobj; - SRowInfo *row; - SBlockData bData[1]; + TABLEID tbid[1]; + int32_t bDataIdx; + SBlockData bData[2]; } ctx[1]; TFileOpArray fopArr[1]; @@ -52,6 +56,7 @@ typedef struct { } SMerger; static int32_t tsdbMergerOpen(SMerger *merger) { + merger->ctx->now = taosGetTimestampMs(); merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows; merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows; merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize; @@ -63,52 +68,98 @@ static int32_t tsdbMergerOpen(SMerger *merger) { } static int32_t tsdbMergerClose(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - SVnode *pVnode = merger->tsdb->pVnode; - int32_t vid = TD_VID(pVnode); - STFileSystem *fs = merger->tsdb->pFS; + int32_t code = 0; + int32_t lino = 0; + SVnode *pVnode = merger->tsdb->pVnode; // edit file system - code = tsdbFSEditBegin(fs, merger->fopArr, TSDB_FEDIT_MERGE); + code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFSEditCommit(fs); + code = tsdbFSEditCommit(merger->tsdb->pFS); TSDB_CHECK_CODE(code, lino, _exit); + ASSERT(merger->dataWriter == NULL); + ASSERT(merger->sttWriter == NULL); + ASSERT(merger->iterMerger == NULL); + ASSERT(TARRAY2_SIZE(merger->iterArr) == 0); + ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); + // clear the merge TARRAY2_FREE(merger->iterArr); TARRAY2_FREE(merger->sttReaderArr); TARRAY2_FREE(merger->fopArr); - tBlockDataDestroy(merger->ctx->bData); + for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { + tBlockDataDestroy(merger->ctx->bData + i); + } tDestroyTSchema(merger->skmTb->pTSchema); + tDestroyTSchema(merger->skmRow->pTSchema); _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(pVnode), lino, code); } - return 0; + return code; } static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { - if (merger->ctx->bData->nRow == 0) return 0; + if (merger->ctx->bData[0].nRow + merger->ctx->bData[1].nRow == 0) return 0; int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); + int32_t cidx = merger->ctx->bDataIdx; + int32_t pidx = (cidx + 1) % 2; - if (merger->ctx->bData->nRow < merger->minRow) { - code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData); + if (merger->ctx->bData[pidx].nRow > 0) { + ASSERT(merger->ctx->bData[pidx].nRow == merger->maxRow); + + int32_t numRow = (merger->ctx->bData[pidx].nRow + merger->ctx->bData[cidx].nRow) / 2; + + SRowInfo row[1] = {{ + .suid = merger->ctx->tbid->suid, + .uid = merger->ctx->tbid->uid, + .row = tsdbRowFromBlockData(merger->ctx->bData + pidx, 0), + }}; + + for (int32_t i = 0; i < numRow; i++) { + row->row.iRow = i; + + code = tsdbDataFileWriteTSData(merger->dataWriter, row); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDataFileFLushTSDataBlock(merger->dataWriter); TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) { + row->row.iRow = i; + code = tsdbDataFileWriteTSData(merger->dataWriter, row); + TSDB_CHECK_CODE(code, lino, _exit); + } + + row->row = tsdbRowFromBlockData(merger->ctx->bData + cidx, 0); + for (int32_t i = 0; i < merger->ctx->bData[cidx].nRow; i++) { + row->row.iRow = i; + code = tsdbDataFileWriteTSData(merger->dataWriter, row); + TSDB_CHECK_CODE(code, lino, _exit); + } } else { - code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData); - TSDB_CHECK_CODE(code, lino, _exit); + if (merger->ctx->bData[cidx].nRow < merger->minRow) { + code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData + cidx); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { + tBlockDataReset(merger->ctx->bData + i); } - tBlockDataClear(merger->ctx->bData); _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } return code; } @@ -116,43 +167,62 @@ _exit: static int32_t tsdbMergeToDataTableBegin(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); - code = tsdbUpdateSkmTb(merger->tsdb, (const TABLEID *)merger->ctx->row, merger->skmTb); + code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb); TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb->pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { + code = tBlockDataInit(merger->ctx->bData, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbMergeToData(SMerger *merger) { +static int32_t tsdbMergeToDataLevel(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); - while ((merger->ctx->row = tsdbIterMergerGet(merger->iterMerger))) { - if (merger->ctx->row->uid != merger->ctx->bData->uid) { + for (SRowInfo *row; (row = tsdbIterMergerGet(merger->iterMerger)) != NULL;) { + if (row->uid != merger->ctx->tbid->uid) { code = tsdbMergeToDataTableEnd(merger); TSDB_CHECK_CODE(code, lino, _exit); + merger->ctx->tbid->suid = row->suid; + merger->ctx->tbid->uid = row->uid; + code = tsdbMergeToDataTableBegin(merger); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid); - TSDB_CHECK_CODE(code, lino, _exit); + TSDBKEY key[1] = {TSDBROW_KEY(&row->row)}; - if (merger->ctx->bData->nRow >= merger->maxRow) { - code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData); + if (key->version <= merger->compactVersion // + && merger->ctx->bData[merger->ctx->bDataIdx].nRow > 0 // + && merger->ctx->bData[merger->ctx->bDataIdx].aTSKEY[merger->ctx->bData[merger->ctx->bDataIdx].nRow - 1] == + key->ts) { + // update + code = tBlockDataUpdateRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL); TSDB_CHECK_CODE(code, lino, _exit); + } else { + if (merger->ctx->bData[merger->ctx->bDataIdx].nRow >= merger->maxRow) { + int32_t idx = (merger->ctx->bDataIdx + 1) % 2; - tBlockDataReset(merger->ctx->bData); + code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + idx); + TSDB_CHECK_CODE(code, lino, _exit); + + tBlockDataClear(merger->ctx->bData + idx); + + // switch to next bData + merger->ctx->bDataIdx = idx; + } + + code = tBlockDataAppendRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL, row->uid); + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbIterMergerNext(merger->iterMerger); @@ -164,7 +234,7 @@ static int32_t tsdbMergeToData(SMerger *merger) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } return code; } @@ -193,7 +263,6 @@ _exit: static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); merger->ctx->toData = true; merger->ctx->level = 0; @@ -213,7 +282,7 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { } else { merger->ctx->level++; - // add the operation + // add remove operation STFileOp op = { .optype = TSDB_FOP_REMOVE, .fid = merger->ctx->fset->fid, @@ -239,7 +308,7 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } return code; } @@ -288,12 +357,8 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - SDiskID did = { - .level = 0, - .id = 0, - }; // TODO - - if (merger->ctx->lvl) { // to existing level + if (merger->ctx->lvl) { + // to existing level SSttFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, .maxRow = merger->maxRow, @@ -304,7 +369,15 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { }}; code = tsdbSttFileWriterOpen(config, &merger->sttWriter); TSDB_CHECK_CODE(code, lino, _exit); - } else { // to new level + } else { + SDiskID did[1]; + int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now); + if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) { + code = TSDB_CODE_FS_NO_VALID_DISK; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // to new level SSttFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, .maxRow = merger->maxRow, @@ -314,7 +387,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { .file = { .type = TSDB_FTYPE_STT, - .did = did, + .did = did[0], .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, @@ -328,8 +401,14 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); } - if (merger->ctx->toData) { // TODO - tBlockDataReset(merger->ctx->bData); + if (merger->ctx->toData) { + // TODO + SDiskID did[1]; + int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now); + if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) { + code = TSDB_CODE_FS_NO_VALID_DISK; + TSDB_CHECK_CODE(code, lino, _exit); + } SDataFileWriterConfig config = { .tsdb = merger->tsdb, @@ -339,7 +418,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { [0] = { .type = TSDB_FTYPE_HEAD, - .did = did, + .did = did[0], .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, @@ -347,7 +426,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { [1] = { .type = TSDB_FTYPE_DATA, - .did = did, + .did = did[0], .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, @@ -355,7 +434,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { [2] = { .type = TSDB_FTYPE_SMA, - .did = did, + .did = did[0], .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, @@ -363,7 +442,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { [3] = { .type = TSDB_FTYPE_TOMB, - .did = did, + .did = did[0], .fid = merger->ctx->fset->fid, .cid = merger->cid, .size = 0, @@ -384,7 +463,6 @@ _exit: static int32_t tsdbMergeFileSetBegin(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); ASSERT(TARRAY2_SIZE(merger->iterArr) == 0); @@ -392,6 +470,13 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { ASSERT(merger->sttWriter == NULL); ASSERT(merger->dataWriter == NULL); + merger->ctx->tbid->suid = 0; + merger->ctx->tbid->uid = 0; + merger->ctx->bDataIdx = 0; + for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); ++i) { + tBlockDataReset(merger->ctx->bData + i); + } + // open reader code = tsdbMergeFileSetBeginOpenReader(merger); TSDB_CHECK_CODE(code, lino, _exit); @@ -406,7 +491,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } return code; } @@ -416,17 +501,7 @@ static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) { int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - STFileOp op[1]; - - if (merger->ctx->toData) { - code = tsdbDataFileWriterClose(&merger->dataWriter, 0, op); - TSDB_CHECK_CODE(code, lino, _exit); - - if (op->optype != TSDB_FOP_NONE) { - code = TARRAY2_APPEND_PTR(merger->fopArr, op); - TSDB_CHECK_CODE(code, lino, _exit); - } - } + STFileOp op[TSDB_FTYPE_MAX]; code = tsdbSttFileWriterClose(&merger->sttWriter, 0, op); TSDB_CHECK_CODE(code, lino, _exit); @@ -436,6 +511,17 @@ static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); } + if (merger->ctx->toData) { + // TODO + code = tsdbDataFileWriterClose(&merger->dataWriter, 0, op); + TSDB_CHECK_CODE(code, lino, _exit); + + if (op->optype != TSDB_FOP_NONE) { + code = TARRAY2_APPEND_PTR(merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code); @@ -485,7 +571,7 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { // do merge if (merger->ctx->toData) { - code = tsdbMergeToData(merger); + code = tsdbMergeToDataLevel(merger); TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbMergeToUpperLevel(merger); @@ -507,16 +593,13 @@ _exit: static int32_t tsdbDoMerge(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); STFileSet *fset; - SSttLvl *lvl; - STFileObj *fobj; TARRAY2_FOREACH(merger->fsetArr, fset) { - lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL; + SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL; if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue; - fobj = TARRAY2_FIRST(lvl->fobjArr); + STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr); if (fobj->f->stt->nseg < merger->sttTrigger) continue; if (!merger->ctx->opened) { @@ -535,9 +618,9 @@ static int32_t tsdbDoMerge(SMerger *merger) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done", vid, __func__); + tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 5251e79df1..a03f976812 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -775,19 +775,15 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { } // row to col conversion - if (key->version <= writer->config->compactVersion) { - if (writer->bData->nRow > 0 // - && (writer->bData->uid // - ? writer->bData->uid - : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid // - && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // - ) { - code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid); - TSDB_CHECK_CODE(code, lino, _exit); - } + if (key->version <= writer->config->compactVersion // + && writer->bData->nRow > 0 // + && (writer->bData->uid // + ? writer->bData->uid // + : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid // + && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // + ) { + code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); } else { if (writer->bData->nRow >= writer->config->maxRow) { code = tsdbSttFileDoWriteTSDataBlock(writer);