From 9a4f6abe5429325b88bafd8913014e7139cfa3da Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 12 Jun 2023 16:12:29 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 357 +++++++++++++++---- source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 2 +- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 3 +- 3 files changed, 298 insertions(+), 64 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 9c7d05138e..f515a4ebc2 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -19,6 +19,10 @@ typedef struct { STsdb *tsdb; TFileSetArray *fsetArr; + TFileOpArray fopArray[1]; + + SSkmInfo skmTb[1]; + SSkmInfo skmRow[1]; int32_t minutes; int8_t precision; @@ -41,13 +45,15 @@ typedef struct { TABLEID tbid[1]; } ctx[1]; - TFileOpArray fopArray[1]; - TTsdbIterArray iterArray[1]; - SIterMerger *iterMerger; + SSttFileReader *sttReader; + TTsdbIterArray iterArray[1]; + SIterMerger *iterMerger; // writer - SSttFileWriter *sttWriter; + SBlockData blockData[2]; + int32_t blockDataIdx; SDataFileWriter *dataWriter; + SSttFileWriter *sttWriter; } SCommitter2; static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { @@ -63,7 +69,7 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { SSttFileWriterConfig config[1] = {{ .tsdb = committer->tsdb, .maxRow = committer->maxRow, - .szPage = committer->tsdb->pVnode->config.tsdbPageSize, + .szPage = committer->szPage, .cmprAlg = committer->cmprAlg, .compactVersion = committer->compactVersion, .file = @@ -116,6 +122,16 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; + // if (committer->sttTrigger == 1) { + // SDataFileWriterConfig config = { + // // TODO + // }; + + // code = tsdbDataFileWriterOpen(&config, &committer->dataWriter); + // TSDB_CHECK_CODE(code, lino, _exit); + // // TODO + // } + // stt writer if (!committer->ctx->fset) { return tsdbCommitOpenNewSttWriter(committer); @@ -133,11 +149,10 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { return tsdbCommitOpenExistSttWriter(committer, fobj->f); } - // data writer - if (0) { - // TODO +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } - return code; } @@ -148,25 +163,21 @@ static int32_t tsdbCommitWriteDelData(SCommitter2 *committer, int64_t suid, int6 return code; } -static int32_t tsdbCommitTSData(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - int64_t nRow = 0; - int32_t vid = TD_VID(committer->tsdb->pVnode); - SRowInfo *row; +static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; - if (committer->tsdb->imem->nRow == 0) goto _exit; + ASSERT(TARRAY2_SIZE(committer->iterArray) == 0); + ASSERT(committer->iterMerger == NULL); - // open iter and iter merger STsdbIter *iter; - STsdbIterConfig config[1] = {{ - .type = TSDB_ITER_TYPE_MEMT, - .memt = committer->tsdb->imem, - .from = {{ - .ts = committer->ctx->minKey, - .version = VERSION_MIN, - }}, - }}; + STsdbIterConfig config[1]; + + // memtable iter + config->type = TSDB_ITER_TYPE_MEMT; + config->memt = committer->tsdb->imem; + config->from->ts = committer->ctx->minKey; + config->from->version = VERSION_MIN; code = tsdbIterOpen(config, &iter); TSDB_CHECK_CODE(code, lino, _exit); @@ -174,18 +185,219 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { code = TARRAY2_APPEND(committer->iterArray, iter); TSDB_CHECK_CODE(code, lino, _exit); + // stt file iter + if (committer->sttReader) { + const TSttSegReaderArray *readerArray; + + tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray); + + SSttSegReader *segReader; + TARRAY2_FOREACH(readerArray, segReader) { + config->type = TSDB_ITER_TYPE_STT; + config->sttReader = segReader; + } + + code = tsdbIterOpen(config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(committer->iterArray, iter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // open iter merger code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, false); TSDB_CHECK_CODE(code, lino, _exit); - // loop iter - while ((row = tsdbIterMergerGetData(committer->iterMerger)) != NULL) { +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTSDataCloseIterMerger(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + tsdbIterMergerClose(&committer->iterMerger); + TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTSDataToDataTableBegin(SCommitter2 *committer, const TABLEID *tbid) { + int32_t code = 0; + int32_t lino = 0; + + committer->ctx->tbid->suid = tbid->suid; + committer->ctx->tbid->uid = tbid->uid; + + code = tsdbUpdateSkmTb(committer->tsdb, committer->ctx->tbid, committer->skmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + committer->blockDataIdx = 0; + for (int32_t i = 0; i < ARRAY_SIZE(committer->blockData); i++) { + code = tBlockDataInit(&committer->blockData[i], committer->ctx->tbid, committer->skmTb->pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTSDataToDataTableEnd(SCommitter2 *committer) { + if (committer->ctx->tbid->uid == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + int32_t cidx = committer->blockDataIdx; + int32_t pidx = ((cidx + 1) & 1); + int32_t numRow = (committer->blockData[cidx].nRow + committer->blockData[pidx].nRow) / 2; + + if (committer->blockData[pidx].nRow > 0 && numRow >= committer->minRow) { + ASSERT(committer->blockData[pidx].nRow == committer->maxRow); + + SRowInfo row[1] = {{ + .suid = committer->ctx->tbid->suid, + .uid = committer->ctx->tbid->uid, + .row = tsdbRowFromBlockData(committer->blockData + pidx, 0), + }}; + + for (int32_t i = 0; i < numRow; i++) { + row->row.iRow = i; + + code = tsdbDataFileWriteRow(committer->dataWriter, row); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDataFileFlush(committer->dataWriter); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = numRow; i < committer->blockData[pidx].nRow; i++) { + row->row.iRow = i; + code = tsdbDataFileWriteRow(committer->dataWriter, row); + TSDB_CHECK_CODE(code, lino, _exit); + } + + row->row = tsdbRowFromBlockData(committer->blockData + cidx, 0); + for (int32_t i = 0; i < committer->blockData[cidx].nRow; i++) { + row->row.iRow = i; + code = tsdbDataFileWriteRow(committer->dataWriter, row); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + if (committer->blockData[pidx].nRow > 0) { + code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + cidx); + TSDB_CHECK_CODE(code, lino, _exit); + } + if (committer->blockData[cidx].nRow < committer->minRow) { + code = tsdbSttFileWriteBlockData(committer->sttWriter, committer->blockData + cidx); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + cidx); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + for (int32_t i = 0; i < ARRAY_SIZE(committer->blockData); i++) { + tBlockDataReset(&committer->blockData[i]); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + SMetaInfo info; + for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->iterMerger)) != NULL;) { + if (row->uid != committer->ctx->tbid->uid) { + // end last table write + code = tsdbCommitTSDataToDataTableEnd(committer); + TSDB_CHECK_CODE(code, lino, _exit); + + // Ignore table of obsolescence + if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { + code = tsdbIterMergerSkipTableData(committer->iterMerger, (TABLEID *)row); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } + + code = tsdbCommitTSDataToDataTableBegin(committer, (TABLEID *)row); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (row->row.type == TSDBROW_ROW_FMT) { + code = tsdbUpdateSkmRow(committer->tsdb, committer->ctx->tbid, TSDBROW_SVERSION(&row->row), committer->skmRow); + TSDB_CHECK_CODE(code, lino, _exit); + } + + TSDBKEY key = TSDBROW_KEY(&row->row); + if (key.version <= committer->compactVersion // + && committer->blockData[committer->blockDataIdx].nRow > 0 // + && key.ts == committer->blockData[committer->blockDataIdx] + .aTSKEY[committer->blockData[committer->blockDataIdx].nRow - 1]) { + code = + tBlockDataUpdateRow(committer->blockData + committer->blockDataIdx, &row->row, committer->skmRow->pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + if (committer->blockData[committer->blockDataIdx].nRow >= committer->maxRow) { + int32_t idx = ((committer->blockDataIdx + 1) & 1); + if (committer->blockData[idx].nRow >= committer->maxRow) { + code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + idx); + TSDB_CHECK_CODE(code, lino, _exit); + + tBlockDataClear(committer->blockData + idx); + } + committer->blockDataIdx = idx; + } + + code = tBlockDataAppendRow(&committer->blockData[committer->blockDataIdx], &row->row, committer->skmRow->pTSchema, + row->uid); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbIterMergerNext(committer->iterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbCommitTSDataToDataTableEnd(committer); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(committer->sttReader == NULL); + + SMetaInfo info; + for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->iterMerger)) != NULL;) { if (row->uid != committer->ctx->tbid->uid) { committer->ctx->tbid->suid = row->suid; committer->ctx->tbid->uid = row->uid; // Ignore table of obsolescence - SMetaInfo info[1]; - if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, info, NULL) != 0) { + if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid); TSDB_CHECK_CODE(code, lino, _exit); continue; @@ -208,9 +420,37 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbCommitTSData(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + if (committer->tsdb->imem->nRow == 0) goto _exit; + + // open iter and iter merger + code = tsdbCommitTSDataOpenIterMerger(committer); + TSDB_CHECK_CODE(code, lino, _exit); + + // loop iter + if (committer->sttTrigger == 1) { + code = tsdbCommitTSDataToData(committer); + TSDB_CHECK_CODE(code, lino, _exit); } else { - tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, committer->ctx->fid, nRow); + code = tsdbCommitTSDataToStt(committer); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // close iter and iter merger + code = tsdbCommitTSDataCloseIterMerger(committer); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } return code; } @@ -254,11 +494,6 @@ static int32_t tsdbCommitDelData(SCommitter2 *committer) { record->ekey = delData->eKey; } - if (!committer->sttWriter) { - code = tsdbCommitOpenWriter(committer); - TSDB_CHECK_CODE(code, lino, _exit); - } - code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); TSDB_CHECK_CODE(code, lino, _exit); } @@ -275,7 +510,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; STsdb *tsdb = committer->tsdb; - int32_t vid = TD_VID(tsdb->pVnode); committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision); committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); @@ -300,10 +534,10 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, - committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel); + tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode), + __func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel); } return 0; } @@ -312,6 +546,11 @@ static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; + if (committer->dataWriter) { + code = tsdbDataFileWriterClose(&committer->dataWriter, 0, committer->fopArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray); TSDB_CHECK_CODE(code, lino, _exit); @@ -330,7 +569,6 @@ _exit: static int32_t tsdbCommitFileSet(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(committer->tsdb->pVnode); // fset commit start code = tsdbCommitFileSetBegin(committer); @@ -349,9 +587,9 @@ static int32_t tsdbCommitFileSet(SCommitter2 *committer) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, committer->ctx->fid); + tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid); } return code; } @@ -360,9 +598,8 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co int32_t code = 0; int32_t lino = 0; - SMemTable *mem = tsdb->imem; - memset(committer, 0, sizeof(committer[0])); + committer->tsdb = tsdb; code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); @@ -376,11 +613,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co committer->compactVersion = INT64_MAX; committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS); committer->ctx->now = taosGetTimestampSec(); - TARRAY2_INIT(committer->fopArray); committer->ctx->nextKey = tsdb->imem->minKey; - if (mem->nDel > 0) { - SRBTreeIter iter[1] = {tRBTreeIterCreate(mem->tbDataTree, 1)}; + if (tsdb->imem->nDel > 0) { + SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)}; + for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) { STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); @@ -404,7 +641,6 @@ _exit: static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(committer->tsdb->pVnode); if (eno == 0) { code = tsdbFSEditBegin(committer->tsdb->pFS, committer->fopArray, TSDB_FEDIT_COMMIT); @@ -423,10 +659,10 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code), - committer->ctx->cid); + tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino, + tstrerror(code), committer->ctx->cid); } else { - tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, committer->ctx->cid); + tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid); } return code; } @@ -443,14 +679,14 @@ int32_t tsdbPreCommit(STsdb *tsdb) { int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { if (!tsdb) return 0; - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(tsdb->pVnode); + int32_t code = 0; + int32_t lino = 0; + SMemTable *imem = tsdb->imem; int64_t nRow = imem->nRow; int64_t nDel = imem->nDel; - if (!nRow && !nDel) { + if (nRow == 0 && nDel == 0) { taosThreadRwlockWrlock(&tsdb->rwLock); tsdb->imem = NULL; taosThreadRwlockUnlock(&tsdb->rwLock); @@ -472,9 +708,9 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else { - tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, vid, __func__, nRow, nDel); + tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(tsdb->pVnode), __func__, nRow, nDel); } return code; } @@ -508,7 +744,6 @@ _exit: int32_t tsdbCommitAbort(STsdb *pTsdb) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(pTsdb->pVnode); if (pTsdb->imem == NULL) goto _exit; @@ -517,9 +752,9 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) { _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbInfo("vgId:%d %s done", vid, __func__); + tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index 9eae73729d..a39cb585e9 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -600,7 +600,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { } // check if need to merge - if (fs->mergeTaskOn == false) { + if (fs->tsdb->pVnode->config.sttTrigger > 1 && fs->mergeTaskOn == false) { STFileSet *fset; TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index eb4fb18ddd..79f2067af5 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -570,7 +570,6 @@ static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) { static int32_t tsdbMergeFileSetEnd(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); code = tsdbMergeFileSetEndCloseWriter(merger); TSDB_CHECK_CODE(code, lino, _exit); @@ -583,7 +582,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) { _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); } return code; }