tsdb/file: new s3flag in mem
This commit is contained in:
parent
84d2f9c379
commit
800cbfbb05
|
@ -720,7 +720,7 @@ static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mndUpdClusterInfo(pReq);
|
mndUpdClusterInfo(pReq);
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSNotifyReq(¬ifyReq);
|
tFreeSNotifyReq(¬ifyReq);
|
||||||
return code;
|
return code;
|
||||||
|
@ -1235,6 +1235,38 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
strcpy(dcfgReq.config, "monitor");
|
strcpy(dcfgReq.config, "monitor");
|
||||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) {
|
||||||
|
int32_t optLen = strlen("s3blockcachesize");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag < 4 || flag > 1024 * 1024) {
|
||||||
|
mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]",
|
||||||
|
cfgReq.dnodeId, flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3blockcachesize");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) {
|
||||||
|
int32_t optLen = strlen("s3uploaddelaysec");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag < 600 || flag > 60 * 60 * 24 * 30) {
|
||||||
|
mError("dnode:%d, failed to config s3UploadDelaySec since value:%d. Valid range: [600, 60 * 60 * 24 * 30]",
|
||||||
|
cfgReq.dnodeId, flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3uploaddelaysec");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
|
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
|
||||||
int32_t optLen = strlen("ttlpushinterval");
|
int32_t optLen = strlen("ttlpushinterval");
|
||||||
int32_t flag = -1;
|
int32_t flag = -1;
|
||||||
|
|
|
@ -405,9 +405,9 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
||||||
long s3Size(const char *object_name);
|
long s3Size(const char *object_name);
|
||||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||||
committer->ctx->skipTsRow = false;
|
committer->ctx->skipTsRow = false;
|
||||||
if (tsS3Enabled && nlevel > 1 /* && committer->ctx->did.level == nlevel - 1*/) {
|
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
|
||||||
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
||||||
if (committer->ctx->fset && fobj) {
|
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||||
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
||||||
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||||
|
|
||||||
|
@ -421,7 +421,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
||||||
committer->ctx->skipTsRow = true;
|
committer->ctx->skipTsRow = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// new fset can be written with ts data
|
// new fset can be written with ts data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -303,6 +303,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) {
|
||||||
if (f1->did.id != f2->did.id) return false;
|
if (f1->did.id != f2->did.id) return false;
|
||||||
if (f1->fid != f2->fid) return false;
|
if (f1->fid != f2->fid) return false;
|
||||||
if (f1->cid != f2->cid) return false;
|
if (f1->cid != f2->cid) return false;
|
||||||
|
if (f1->s3flag != f2->s3flag) return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,7 @@ int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2);
|
||||||
struct STFile {
|
struct STFile {
|
||||||
tsdb_ftype_t type;
|
tsdb_ftype_t type;
|
||||||
SDiskID did; // disk id
|
SDiskID did; // disk id
|
||||||
|
int32_t s3flag;
|
||||||
int32_t fid; // file id
|
int32_t fid; // file id
|
||||||
int64_t cid; // commit id
|
int64_t cid; // commit id
|
||||||
int64_t size;
|
int64_t size;
|
||||||
|
|
|
@ -211,6 +211,8 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, const
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
op.nf.s3flag = true;
|
||||||
|
|
||||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue