Merge pull request #23680 from taosdata/fix/TD-27273
fix(tsdb/merge): skip merge if data file cannot be written
This commit is contained in:
commit
f7e63ad1ca
|
@ -409,7 +409,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
|||
extern int32_t tsS3UploadDelaySec;
|
||||
long s3Size(const char *object_name);
|
||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||
committer->ctx->skipTsRow = false;
|
||||
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
|
||||
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
||||
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||
|
|
|
@ -882,9 +882,39 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
|
||||
if (numFile >= sttTrigger) {
|
||||
// launch merge
|
||||
bool skipMerge = false;
|
||||
{
|
||||
int32_t now = taosGetTimestampSec();
|
||||
|
||||
extern int8_t tsS3Enabled;
|
||||
extern int32_t tsS3UploadDelaySec;
|
||||
long s3Size(const char *object_name);
|
||||
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
|
||||
if (tsS3Enabled && nlevel > 1) {
|
||||
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
|
||||
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
||||
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||
|
||||
if (taosCheckExistFile(fobj->fname)) {
|
||||
int32_t mtime = 0;
|
||||
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||
if (mtime < now - tsS3UploadDelaySec) {
|
||||
skipMerge = true;
|
||||
}
|
||||
} else if (s3Size(object_name) > 0) {
|
||||
skipMerge = true;
|
||||
}
|
||||
}
|
||||
// new fset can be written with ts data
|
||||
}
|
||||
}
|
||||
|
||||
if (!skipMerge) {
|
||||
code = tsdbSchedMerge(fs->tsdb, fset->fid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
|
||||
tsdbFSSetBlockCommit(fset, true);
|
||||
|
|
|
@ -50,6 +50,7 @@ void remove_file(const char *fname, bool last_level) {
|
|||
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
|
||||
if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) {
|
||||
s3DeleteObjects(&object_name, 1);
|
||||
tsdbInfo("file:%s is removed from s3", fname);
|
||||
} else {
|
||||
tsdbError("file:%s remove failed", fname);
|
||||
}
|
||||
|
|
|
@ -103,7 +103,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
|
|||
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
TdFilePtr fdFrom = NULL;
|
||||
TdFilePtr fdTo = NULL;
|
||||
// TdFilePtr fdTo = NULL;
|
||||
|
||||
tsdbTFileName(rtner->tsdb, to, fname);
|
||||
|
||||
|
@ -359,7 +359,8 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
|||
s3EvictCache(fobj->fname, fsize * 2);
|
||||
}
|
||||
*/
|
||||
tsdbInfo("file:%s size: %" PRId64 " do migrate", fobj->fname, fobj->f->size);
|
||||
tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
|
||||
did.level);
|
||||
|
||||
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
Loading…
Reference in New Issue