diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0a155f4ea1..a170e5436e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -120,7 +120,7 @@ char tsSmlTsDefaultName[TSDB_COL_NAME_LEN] = "_ts"; char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value. char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; - // If set to empty system will generate table name using MD5 hash. +// If set to empty system will generate table name using MD5 hash. // true means that the name and order of cols in each line are the same(only for influx protocol) // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; @@ -273,6 +273,7 @@ char tsS3Hostname[TSDB_FQDN_LEN] = ""; int32_t tsS3BlockSize = 4096; // number of tsdb pages int32_t tsS3BlockCacheSize = 16; // number of blocks +int32_t tsS3UploadDelaySec = 60 * 60; int32_t tsCheckpointInterval = 300; @@ -453,7 +454,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1; - if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) return -1; + if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) + return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1; @@ -665,7 +667,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; @@ -688,6 +691,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0) + return -1; // min free disk space used to check if the disk is full [50MB, 1GB] if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, @@ -946,7 +951,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { return -1; } - tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN); + tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, + TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN); @@ -1116,6 +1122,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; + tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32; GRANT_CFG_GET; return 0; @@ -1413,7 +1420,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { } else if (strcasecmp("smlChildTableName", name) == 0) { tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); } else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) { - tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN); + tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, + TSDB_TABLE_NAME_LEN); } else if (strcasecmp("smlTagName", name) == 0) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); // } else if (strcasecmp("smlDataFormat", name) == 0) { @@ -1708,6 +1716,13 @@ void taosCfgDynamicOptions(const char *option, const char *value) { return; } + if (strcasecmp(option, "s3UploadDelaySec") == 0) { + int32_t newS3UploadDelaysec = atoi(value); + uInfo("s3UploadDelaySec set from %d to %d", tsS3UploadDelaySec, newS3UploadDelaysec); + tsS3UploadDelaySec = newS3UploadDelaysec; + return; + } + if (strcasecmp(option, "ttlPushInterval") == 0) { int32_t newTtlPushInterval = atoi(value); uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval); diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index bb4d284f0e..6612b5c653 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -27,6 +27,7 @@ extern "C" { extern int8_t tsS3Enabled; extern int32_t tsS3BlockSize; extern int32_t tsS3BlockCacheSize; +extern int32_t tsS3UploadDelaySec; int32_t s3Init(); void s3CleanUp(); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 6dc492f420..c77c444125 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -46,6 +46,7 @@ typedef struct { STFileSet *fset; TABLEID tbid[1]; bool hasTSData; + bool skipTsRow; } ctx[1]; // reader @@ -127,18 +128,18 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { continue; } } - + /* extern int8_t tsS3Enabled; int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - bool skipRow = false; + committer->ctx->skipTsRow = false; if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) { - skipRow = true; + committer->ctx->skipTsRow = true; } - + */ int64_t ts = TSDBROW_TS(&row->row); - if (skipRow && ts <= committer->ctx->maxKey) { + if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) { ts = committer->ctx->maxKey + 1; } @@ -397,6 +398,33 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { // reset nextKey committer->ctx->nextKey = TSKEY_MAX; + committer->ctx->skipTsRow = false; + + extern int8_t tsS3Enabled; + extern int32_t tsS3UploadDelaySec; + long s3Size(const char *object_name); + int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); + committer->ctx->skipTsRow = false; + if (tsS3Enabled && nlevel > 1 /* && committer->ctx->did.level == nlevel - 1*/) { + STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; + if (committer->ctx->fset && fobj) { + // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay + const char *object_name = taosDirEntryBaseName((char *)fobj->fname); + + if (taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < committer->ctx->now - tsS3UploadDelaySec) { + committer->ctx->skipTsRow = true; + } + } else if (s3Size(object_name) > 0) { + committer->ctx->skipTsRow = true; + } + } + + // new fset can be written with ts data + } + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index f3bcfef703..6904249adb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -131,6 +131,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) { } if (pFD->s3File) { + tsdbWarn("%s file:%d", __func__, pFD->path); return code; } if (pFD->pgno > 0) { @@ -286,6 +287,7 @@ int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; if (pFD->s3File) { + tsdbWarn("%s file:%d", __func__, pFD->path); return code; } code = tsdbWriteFilePage(pFD); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 8fb7648c99..4a5c8300cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -304,7 +304,8 @@ static int32_t tsdbDoRetention2(void *arg) { if (fobj == NULL) continue; int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1 && + !taosCheckExistFile(fobj->fname)) { code = tsdbRemoveFileObjectS3(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); } else { @@ -335,28 +336,40 @@ static int32_t tsdbDoRetention2(void *arg) { for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - if (fobj->f->did.level == did.level) continue; - int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { - code = tsdbMigrateDataFileS3(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - /* - if (tsS3Enabled) { - int64_t fsize = 0; - if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(terrno); - tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__, - fobj->fname, tstrerror(code)); + + if (fobj->f->did.level == did.level) { + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 && + taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < rtner->now - tsS3UploadDelaySec) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); } - s3EvictCache(fobj->fname, fsize * 2); } - */ - code = tsdbDoMigrateFileObj(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); + + continue; } + /* + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + + if (tsS3Enabled) { + int64_t fsize = 0; + if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(terrno); + tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), + __func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit); + } + s3EvictCache(fobj->fname, fsize * 2); + } + */ + code = tsdbDoMigrateFileObj(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + //} } // stt