fix(tsdb/merge): skip merge if data file cannot be written

This commit is contained in:
Minglei Jin 2023-11-14 14:15:02 +08:00
parent 9167a07e89
commit cb3fe4d6de
4 changed files with 37 additions and 6 deletions

View File

@ -409,7 +409,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
extern int32_t tsS3UploadDelaySec;
long s3Size(const char *object_name);
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
committer->ctx->skipTsRow = false;
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
if (fobj && fobj->f->did.level == nlevel - 1) {
@ -670,4 +669,4 @@ _exit:
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
}

View File

@ -882,8 +882,38 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
if (numFile >= sttTrigger) {
// launch merge
code = tsdbSchedMerge(fs->tsdb, fset->fid);
TSDB_CHECK_CODE(code, lino, _exit);
bool skipMerge = false;
{
int32_t now = taosGetTimestampSec();
extern int8_t tsS3Enabled;
extern int32_t tsS3UploadDelaySec;
long s3Size(const char *object_name);
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1) {
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
if (fobj && fobj->f->did.level == nlevel - 1) {
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
if (taosCheckExistFile(fobj->fname)) {
int32_t mtime = 0;
taosStatFile(fobj->fname, NULL, &mtime, NULL);
if (mtime < now - tsS3UploadDelaySec) {
skipMerge = true;
}
} else if (s3Size(object_name) > 0) {
skipMerge = true;
}
}
// new fset can be written with ts data
}
}
if (!skipMerge) {
code = tsdbSchedMerge(fs->tsdb, fset->fid);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {

View File

@ -50,6 +50,7 @@ void remove_file(const char *fname, bool last_level) {
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) {
s3DeleteObjects(&object_name, 1);
tsdbInfo("file:%s is removed from s3", fname);
} else {
tsdbError("file:%s remove failed", fname);
}

View File

@ -103,7 +103,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
char fname[TSDB_FILENAME_LEN];
TdFilePtr fdFrom = NULL;
TdFilePtr fdTo = NULL;
// TdFilePtr fdTo = NULL;
tsdbTFileName(rtner->tsdb, to, fname);
@ -359,7 +359,8 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
s3EvictCache(fobj->fname, fsize * 2);
}
*/
tsdbInfo("file:%s size: %" PRId64 " do migrate", fobj->fname, fobj->f->size);
tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
did.level);
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);