tsdb/upload: delay data file uploading
This commit is contained in:
parent
4fede02da4
commit
84d2f9c379
|
@ -120,7 +120,7 @@ char tsSmlTsDefaultName[TSDB_COL_NAME_LEN] = "_ts";
|
||||||
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
|
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
|
||||||
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
|
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
|
||||||
char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
|
char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
|
||||||
// If set to empty system will generate table name using MD5 hash.
|
// If set to empty system will generate table name using MD5 hash.
|
||||||
// true means that the name and order of cols in each line are the same(only for influx protocol)
|
// true means that the name and order of cols in each line are the same(only for influx protocol)
|
||||||
// bool tsSmlDataFormat = false;
|
// bool tsSmlDataFormat = false;
|
||||||
// int32_t tsSmlBatchSize = 10000;
|
// int32_t tsSmlBatchSize = 10000;
|
||||||
|
@ -273,6 +273,7 @@ char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
|
||||||
|
|
||||||
int32_t tsS3BlockSize = 4096; // number of tsdb pages
|
int32_t tsS3BlockSize = 4096; // number of tsdb pages
|
||||||
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
||||||
|
int32_t tsS3UploadDelaySec = 60 * 60;
|
||||||
|
|
||||||
int32_t tsCheckpointInterval = 300;
|
int32_t tsCheckpointInterval = 300;
|
||||||
|
|
||||||
|
@ -453,7 +454,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0)
|
||||||
|
return -1;
|
||||||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
|
@ -665,7 +667,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
|
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -688,6 +691,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
// min free disk space used to check if the disk is full [50MB, 1GB]
|
// min free disk space used to check if the disk is full [50MB, 1GB]
|
||||||
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
|
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
|
||||||
|
@ -946,7 +951,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
|
||||||
|
@ -1116,6 +1122,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
|
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
|
||||||
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
|
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
|
||||||
|
tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32;
|
||||||
|
|
||||||
GRANT_CFG_GET;
|
GRANT_CFG_GET;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1413,7 +1420,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
|
||||||
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
||||||
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
} else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) {
|
} else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) {
|
||||||
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
} else if (strcasecmp("smlTagName", name) == 0) {
|
} else if (strcasecmp("smlTagName", name) == 0) {
|
||||||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
// } else if (strcasecmp("smlDataFormat", name) == 0) {
|
// } else if (strcasecmp("smlDataFormat", name) == 0) {
|
||||||
|
@ -1708,6 +1716,13 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (strcasecmp(option, "s3UploadDelaySec") == 0) {
|
||||||
|
int32_t newS3UploadDelaysec = atoi(value);
|
||||||
|
uInfo("s3UploadDelaySec set from %d to %d", tsS3UploadDelaySec, newS3UploadDelaysec);
|
||||||
|
tsS3UploadDelaySec = newS3UploadDelaysec;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (strcasecmp(option, "ttlPushInterval") == 0) {
|
if (strcasecmp(option, "ttlPushInterval") == 0) {
|
||||||
int32_t newTtlPushInterval = atoi(value);
|
int32_t newTtlPushInterval = atoi(value);
|
||||||
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);
|
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);
|
||||||
|
|
|
@ -27,6 +27,7 @@ extern "C" {
|
||||||
extern int8_t tsS3Enabled;
|
extern int8_t tsS3Enabled;
|
||||||
extern int32_t tsS3BlockSize;
|
extern int32_t tsS3BlockSize;
|
||||||
extern int32_t tsS3BlockCacheSize;
|
extern int32_t tsS3BlockCacheSize;
|
||||||
|
extern int32_t tsS3UploadDelaySec;
|
||||||
|
|
||||||
int32_t s3Init();
|
int32_t s3Init();
|
||||||
void s3CleanUp();
|
void s3CleanUp();
|
||||||
|
|
|
@ -46,6 +46,7 @@ typedef struct {
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
TABLEID tbid[1];
|
TABLEID tbid[1];
|
||||||
bool hasTSData;
|
bool hasTSData;
|
||||||
|
bool skipTsRow;
|
||||||
} ctx[1];
|
} ctx[1];
|
||||||
|
|
||||||
// reader
|
// reader
|
||||||
|
@ -127,18 +128,18 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
extern int8_t tsS3Enabled;
|
extern int8_t tsS3Enabled;
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||||
bool skipRow = false;
|
committer->ctx->skipTsRow = false;
|
||||||
if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) {
|
if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) {
|
||||||
skipRow = true;
|
committer->ctx->skipTsRow = true;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
int64_t ts = TSDBROW_TS(&row->row);
|
int64_t ts = TSDBROW_TS(&row->row);
|
||||||
|
|
||||||
if (skipRow && ts <= committer->ctx->maxKey) {
|
if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) {
|
||||||
ts = committer->ctx->maxKey + 1;
|
ts = committer->ctx->maxKey + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,6 +398,33 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
||||||
// reset nextKey
|
// reset nextKey
|
||||||
committer->ctx->nextKey = TSKEY_MAX;
|
committer->ctx->nextKey = TSKEY_MAX;
|
||||||
|
|
||||||
|
committer->ctx->skipTsRow = false;
|
||||||
|
|
||||||
|
extern int8_t tsS3Enabled;
|
||||||
|
extern int32_t tsS3UploadDelaySec;
|
||||||
|
long s3Size(const char *object_name);
|
||||||
|
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||||
|
committer->ctx->skipTsRow = false;
|
||||||
|
if (tsS3Enabled && nlevel > 1 /* && committer->ctx->did.level == nlevel - 1*/) {
|
||||||
|
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
||||||
|
if (committer->ctx->fset && fobj) {
|
||||||
|
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
||||||
|
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||||
|
|
||||||
|
if (taosCheckExistFile(fobj->fname)) {
|
||||||
|
int32_t mtime = 0;
|
||||||
|
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||||
|
if (mtime < committer->ctx->now - tsS3UploadDelaySec) {
|
||||||
|
committer->ctx->skipTsRow = true;
|
||||||
|
}
|
||||||
|
} else if (s3Size(object_name) > 0) {
|
||||||
|
committer->ctx->skipTsRow = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// new fset can be written with ts data
|
||||||
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
|
|
@ -131,6 +131,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFD->s3File) {
|
if (pFD->s3File) {
|
||||||
|
tsdbWarn("%s file:%d", __func__, pFD->path);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (pFD->pgno > 0) {
|
if (pFD->pgno > 0) {
|
||||||
|
@ -286,6 +287,7 @@ int32_t tsdbFsyncFile(STsdbFD *pFD) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pFD->s3File) {
|
if (pFD->s3File) {
|
||||||
|
tsdbWarn("%s file:%d", __func__, pFD->path);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
code = tsdbWriteFilePage(pFD);
|
code = tsdbWriteFilePage(pFD);
|
||||||
|
|
|
@ -304,7 +304,8 @@ static int32_t tsdbDoRetention2(void *arg) {
|
||||||
if (fobj == NULL) continue;
|
if (fobj == NULL) continue;
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
||||||
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
|
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1 &&
|
||||||
|
!taosCheckExistFile(fobj->fname)) {
|
||||||
code = tsdbRemoveFileObjectS3(rtner, fobj);
|
code = tsdbRemoveFileObjectS3(rtner, fobj);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
|
@ -335,28 +336,40 @@ static int32_t tsdbDoRetention2(void *arg) {
|
||||||
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
|
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
|
||||||
if (fobj == NULL) continue;
|
if (fobj == NULL) continue;
|
||||||
|
|
||||||
if (fobj->f->did.level == did.level) continue;
|
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
||||||
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
|
|
||||||
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
|
if (fobj->f->did.level == did.level) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 &&
|
||||||
} else {
|
taosCheckExistFile(fobj->fname)) {
|
||||||
/*
|
int32_t mtime = 0;
|
||||||
if (tsS3Enabled) {
|
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||||
int64_t fsize = 0;
|
if (mtime < rtner->now - tsS3UploadDelaySec) {
|
||||||
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
|
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
|
||||||
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
|
|
||||||
fobj->fname, tstrerror(code));
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
s3EvictCache(fobj->fname, fsize * 2);
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
continue;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
|
||||||
|
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
if (tsS3Enabled) {
|
||||||
|
int64_t fsize = 0;
|
||||||
|
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
|
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode),
|
||||||
|
__func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
s3EvictCache(fobj->fname, fsize * 2);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
//}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stt
|
// stt
|
||||||
|
|
Loading…
Reference in New Issue