From 47b961018c26d16752eeb3bb4a771d48cd36319a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 11 Apr 2023 11:00:35 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 54 +++++++++++++------ source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 12 ++--- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c | 2 +- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h | 12 ++--- source/dnode/vnode/src/vnd/vnodeCommit.c | 7 ++- 5 files changed, 57 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 8f5467b7d6..19c1b7fed1 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -43,11 +43,11 @@ static int32_t open_committer_writer(SCommitter *pCommitter) { struct SSttFWriterConf conf = { .pTsdb = pCommitter->pTsdb, - .pSkmTb = NULL, - .pSkmRow = NULL, .maxRow = pCommitter->maxRow, .szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize, .cmprAlg = pCommitter->cmprAlg, + .pSkmTb = NULL, + .pSkmRow = NULL, .aBuf = NULL, }; @@ -134,19 +134,28 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) { break; } - nRow++; - code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow); TSDB_CHECK_CODE(code, lino, _exit); + + nRow++; } } _exit: if (code) { - tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + TD_VID(pCommitter->pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, - nRow); + tsdbDebug( // + "vgId:%d %s done, fid:%d nRow:%" PRId64, // + TD_VID(pCommitter->pTsdb->pVnode), // + __func__, // + pCommitter->fid, // + nRow); } return code; } @@ -155,7 +164,7 @@ static int32_t commit_delete_data(SCommitter *pCommitter) { int32_t code = 0; int32_t lino; - // ASSERTS(0, "TODO: Not implemented yet"); + ASSERTS(0, "TODO: Not implemented yet"); int64_t nDel = 0; SMemTable *pMem = pCommitter->pTsdb->imem; @@ -197,9 +206,14 @@ static int32_t start_commit_file_set(SCommitter *pCommitter) { pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->nextKey = TSKEY_MAX; - tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", - TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey, - pCommitter->expLevel); + tsdbDebug( // + "vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", // + TD_VID(pCommitter->pTsdb->pVnode), // + __func__, // + pCommitter->fid, // + pCommitter->minKey, // + pCommitter->maxKey, // + pCommitter->expLevel); return 0; } @@ -228,8 +242,10 @@ static int32_t commit_next_file_set(SCommitter *pCommitter) { code = commit_timeseries_data(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); + /* TODO code = commit_delete_data(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); + */ // fset commit end code = end_commit_file_set(pCommitter); @@ -255,14 +271,14 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows; pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows; pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression; - pCommitter->sttTrigger = 0; // TODO + pCommitter->sttTrigger = 7; // TODO pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); if (pCommitter->aTbDataP == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - pCommitter->aFileOp = taosArrayInit(10, sizeof(struct SFileOp)); + pCommitter->aFileOp = taosArrayInit(16, sizeof(struct SFileOp)); if (pCommitter->aFileOp == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); @@ -273,9 +289,17 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + TD_VID(pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + tsdbDebug( // + "vgId:%d %s done", // + TD_VID(pTsdb->pVnode), // + __func__); } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index a0af60c052..0e7a0a53aa 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -117,9 +117,9 @@ static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) { } /* format version */ - TSDB_CHECK_NULL( // - cJSON_AddNumberToObject(pJson, // - "format", // + TSDB_CHECK_NULL( // + cJSON_AddNumberToObject(pJson, // + "version", // 1 /* TODO */), code, // lino, // @@ -127,9 +127,9 @@ static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) { TSDB_CODE_OUT_OF_MEMORY); /* next edit id */ - TSDB_CHECK_NULL( // - cJSON_AddNumberToObject(pJson, // - "next edit id", // + TSDB_CHECK_NULL( // + cJSON_AddNumberToObject(pJson, // + "edit id", // pFS->nextEditId), code, // lino, // diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c index d67a41626c..16437b4e59 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c @@ -394,7 +394,7 @@ _exit: return code; } -int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter) { +int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct SFileOp *op) { int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode); int32_t code = 0; int32_t lino; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h index 42203617f4..6c199476f2 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h @@ -23,13 +23,6 @@ extern "C" { #endif struct SSttFWriter; -struct SSttFWriterConf; - -int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWriter **ppWriter); -int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter); -int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); -int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData); - struct SSttFWriterConf { STsdb *pTsdb; struct STFile file; @@ -41,6 +34,11 @@ struct SSttFWriterConf { uint8_t **aBuf; }; +int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWriter **ppWriter); +int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct SFileOp *op); +int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); +int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index bdda6b3c7e..1b62c48859 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -19,6 +19,8 @@ #ifdef USE_DEV_CODE extern int32_t tsdbPreCommit(STsdb *pTsdb); extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo); +extern int32_t tsdbCommitCommit(STsdb *pTsdb); +extern int32_t tsdbCommitAbort(STsdb *pTsdb); #endif #define VND_INFO_FNAME_TMP "vnode_tmp.json" @@ -455,7 +457,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed); -// commit each sub-system #ifdef USE_DEV_CODE code = tsdbCommitBegin(pVnode->pTsdb, pInfo); #else @@ -479,7 +480,11 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { TSDB_CHECK_CODE(code, lino, _exit); } +#ifdef USE_DEV_CODE + code = tsdbCommitCommit(pVnode->pTsdb); +#else code = tsdbFinishCommit(pVnode->pTsdb); +#endif TSDB_CHECK_CODE(code, lino, _exit); if (VND_IS_RSMA(pVnode)) {