From cb3fe4d6de71fe81ddd986f466660a1fe44328b4 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 14 Nov 2023 14:15:02 +0800 Subject: [PATCH] fix(tsdb/merge): skip merge if data file cannot be written --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 3 +- source/dnode/vnode/src/tsdb/tsdbFS2.c | 34 +++++++++++++++++++-- source/dnode/vnode/src/tsdb/tsdbFile2.c | 1 + source/dnode/vnode/src/tsdb/tsdbRetention.c | 5 +-- 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 22fb3b84ec..48b622e324 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -409,7 +409,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { extern int32_t tsS3UploadDelaySec; long s3Size(const char *object_name); int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - committer->ctx->skipTsRow = false; if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) { STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; if (fobj && fobj->f->did.level == nlevel - 1) { @@ -670,4 +669,4 @@ _exit: tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 02ef75ae86..a3b3ffc1c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -882,8 +882,38 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); if (numFile >= sttTrigger) { // launch merge - code = tsdbSchedMerge(fs->tsdb, fset->fid); - TSDB_CHECK_CODE(code, lino, _exit); + bool skipMerge = false; + { + int32_t now = taosGetTimestampSec(); + + extern int8_t tsS3Enabled; + extern int32_t tsS3UploadDelaySec; + long s3Size(const char *object_name); + int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); + if (tsS3Enabled && nlevel > 1) { + STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA]; + if (fobj && fobj->f->did.level == nlevel - 1) { + // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay + const char *object_name = taosDirEntryBaseName((char *)fobj->fname); + + if (taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < now - tsS3UploadDelaySec) { + skipMerge = true; + } + } else if (s3Size(object_name) > 0) { + skipMerge = true; + } + } + // new fset can be written with ts data + } + } + + if (!skipMerge) { + code = tsdbSchedMerge(fs->tsdb, fset->fid); + TSDB_CHECK_CODE(code, lino, _exit); + } } if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index cc05b8ee18..8cd9304188 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -50,6 +50,7 @@ void remove_file(const char *fname, bool last_level) { long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { s3DeleteObjects(&object_name, 1); + tsdbInfo("file:%s is removed from s3", fname); } else { tsdbError("file:%s remove failed", fname); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index b61f17a52a..bc76377597 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -103,7 +103,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile char fname[TSDB_FILENAME_LEN]; TdFilePtr fdFrom = NULL; - TdFilePtr fdTo = NULL; + // TdFilePtr fdTo = NULL; tsdbTFileName(rtner->tsdb, to, fname); @@ -359,7 +359,8 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { s3EvictCache(fobj->fname, fsize * 2); } */ - tsdbInfo("file:%s size: %" PRId64 " do migrate", fobj->fname, fobj->f->size); + tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level, + did.level); code = tsdbDoMigrateFileObj(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit);