From 7bc38466bfad4d3257c22d6ed8bd23289cf987a1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 23 Mar 2023 16:10:08 +0800 Subject: [PATCH] more code --- cmake/cmake.options | 11 +++ source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c | 62 +++++++++------ .../vnode/src/tsdb/dev/tsdbReaderWriter2.c | 75 ++++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbUtil.c | 6 +- source/dnode/vnode/src/vnd/vnodeCommit.c | 15 +++- 5 files changed, 130 insertions(+), 39 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index 60ff00affc..f803b111ca 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -171,3 +171,14 @@ option( ON ) + +# open this flag to use dev code, make sure it is off in release version +option( + USE_DEV_CODE + "If use dev code" + ON +) + +if (${USE_DEV_CODE}) + add_definitions(-DUSE_DEV_CODE) +endif(USE_DEV_CODE) \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c index 7001f9c8da..8438821943 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c @@ -18,9 +18,9 @@ // extern dependencies typedef struct SSttFWriter SSttFWriter; -extern int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter); -extern int32_t tsdbSttFWriterClose(SSttFWriter *pWritter); -extern int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow); +extern int32_t tsdbSttFWriterOpen(STsdb *pTsdb, const SSttFile *pSttFile, SSttFWriter **ppWriter); +extern int32_t tsdbSttFWriterClose(SSttFWriter *pWriter); +extern int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); typedef struct { STsdb *pTsdb; @@ -43,12 +43,23 @@ typedef struct { } SCommitter; static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) { - int32_t code = 0; - // TODO + int32_t code; + int32_t lino; + + SSttFile sttFile = {0}; // TODO + + code = tsdbSttFWriterOpen(pCommitter->pTsdb, &sttFile, &pCommitter->pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tstrerror(code), pCommitter->fid); + } return code; } -static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64_t uid, TSDBROW *pRow) { +static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) { int32_t code = 0; int32_t lino; @@ -57,7 +68,7 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64 TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbSttFWriteRow(pCommitter->pWriter, suid, uid, pRow); + code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -65,7 +76,7 @@ _exit: tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); } else { tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, - TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, suid, uid, TSDBROW_KEY(pRow).ts, + TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts, TSDBROW_KEY(pRow).version); } return 0; @@ -106,7 +117,7 @@ static int32_t tsdbCommitTSData(SCommitter *pCommitter) { nRow++; - code = tsdbCommitWriteTSData(pCommitter, pTbData->suid, pTbData->uid, pRow); + code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -161,22 +172,16 @@ _exit: } static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, &pCommitter->maxKey); pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->nextKey = TSKEY_MAX; - // TODO - -_exit: - if (code) { - tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); - } - return code; + tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", + TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey, + pCommitter->expLevel); + return 0; } static int32_t tsdbCommitFSetEnd(SCommitter *pCommitter) { @@ -221,13 +226,26 @@ _exit: static int32_t tsdbCommitterOpen(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) { int32_t code = 0; - int32_t lino = 0; + int32_t lino; + // set config memset(pCommitter, 0, sizeof(SCommitter)); pCommitter->pTsdb = pTsdb; - pCommitter->nextKey = pTsdb->imem->minKey; // TODO + pCommitter->minutes = pTsdb->keepCfg.days; + pCommitter->precision = pTsdb->keepCfg.precision; + pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows; + pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows; + pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression; + pCommitter->sttTrigger = 0; // TODO - // TODO + pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); + if (pCommitter->aTbDataP == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // start loop + pCommitter->nextKey = pTsdb->imem->minKey; // TODO _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c b/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c index 8a6bbe897c..64cff95702 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c @@ -7,44 +7,97 @@ extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd extern void tsdbCloseFile(STsdbFD **ppFD); struct SSttFWriter { STsdb *pTsdb; - STsdbFD *pFd; SSttFile file; SBlockData bData; SArray *aSttBlk; + STsdbFD *pFd; + SSkmInfo *pSkmTb; + SSkmInfo *pSkmRow; + int32_t maxRow; }; -int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) { +static int32_t tsdbSttFWriteTSDataBlock(SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbSttFWriterOpen(STsdb *pTsdb, const SSttFile *pSttFile, SSttFWriter **ppWriter) { int32_t code = 0; int32_t lino = 0; - int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; - int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO + int32_t flag = TD_FILE_READ | TD_FILE_WRITE; + char fname[TSDB_FILENAME_LEN]; - ppWritter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); - if (ppWritter[0] == NULL) { + ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); + if (ppWriter[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - ppWritter[0]->pTsdb = pTsdb; - ppWritter[0]->file = pSttFile[0]; + ppWriter[0]->pTsdb = pTsdb; + ppWriter[0]->file = pSttFile[0]; - code = tsdbOpenFile(NULL, szPage, flag, &ppWritter[0]->pFd); + tBlockDataCreate(&ppWriter[0]->bData); + ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk)); + if (ppWriter[0]->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (pSttFile->size == 0) flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); + tsdbSttFileName(pTsdb, (SDiskID){0}, 0, &ppWriter[0]->file, fname); // TODO + code = tsdbOpenFile(fname, szPage, flag, &ppWriter[0]->pFd); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + if (ppWriter[0]) { + tBlockDataDestroy(&ppWriter[0]->bData); + taosArrayDestroy(ppWriter[0]->aSttBlk); + taosMemoryFree(ppWriter[0]); + ppWriter[0] = NULL; + } } return 0; } -int32_t tsdbSttFWriterClose(SSttFWriter *pWritter) { +int32_t tsdbSttFWriterClose(SSttFWriter *pWriter) { // TODO return 0; } -int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow) { +int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { + int32_t code = 0; + int32_t lino = 0; + + if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { + if (pWriter->bData.nRow > 0) { + code = tsdbSttFWriteTSDataBlock(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataInit(&pWriter->bData, tbid, NULL /* TODO */, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL /* TODO */, tbid->uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pWriter->bData.nRow >= pWriter->maxRow) { + code = tsdbSttFWriteTSDataBlock(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return 0; +} + +int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, int64_t suid, int64_t uid, int64_t version) { int32_t code = 0; int32_t lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index c323ae1532..fbe0364e36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1161,8 +1161,6 @@ int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SA // SBlockData ====================================================== int32_t tBlockDataCreate(SBlockData *pBlockData) { - int32_t code = 0; - pBlockData->suid = 0; pBlockData->uid = 0; pBlockData->nRow = 0; @@ -1171,9 +1169,7 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) { pBlockData->aTSKEY = NULL; pBlockData->nColData = 0; pBlockData->aColData = NULL; - -_exit: - return code; + return 0; } void tBlockDataDestroy(SBlockData *pBlockData) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 0779388ba9..5dc9c1a7d0 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -16,6 +16,11 @@ #include "vnd.h" #include "vnodeInt.h" +#ifdef USE_DEV_CODE +extern int32_t tsdbPreCommit(STsdb *pTsdb); +extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo); +#endif + #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); @@ -314,7 +319,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { TSDB_CHECK_CODE(code, lino, _exit); } +#ifdef USE_DEV_CODE + tsdbPreCommit(pVnode->pTsdb); +#else tsdbPrepareCommit(pVnode->pTsdb); +#endif metaPrepareAsyncCommit(pVnode->pMeta); @@ -449,8 +458,12 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed); - // commit each sub-system +// commit each sub-system +#ifdef USE_DEV_CODE + code = tsdbCommitBegin(pVnode->pTsdb, pInfo); +#else code = tsdbCommit(pVnode->pTsdb, pInfo); +#endif TSDB_CHECK_CODE(code, lino, _exit); if (VND_IS_RSMA(pVnode)) {