From 8c6f452f09d3bc9fe66915d7415bfac553628c52 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 27 Oct 2023 15:51:52 +0800 Subject: [PATCH 01/25] tsdb/write: make last tier writable to mem --- source/dnode/vnode/src/tsdb/tsdbWrite.c | 6 +++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 5949b103d5..1e6526da48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -80,8 +80,8 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; int32_t size = taosArrayGetSize(pMsg->aSubmitTbData); - int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); - + /* + int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); if (nlevel > 1 && tsS3Enabled) { if (nlevel == 3) { minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1; @@ -89,7 +89,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0; } } - + */ for (int32_t i = 0; i < size; ++i) { SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i); if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c46ea15111..ed3b9460a6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -13,13 +13,13 @@ * along with this program. If not, see . */ +#include "audit.h" #include "tencode.h" #include "tmsg.h" #include "vnd.h" #include "vndCos.h" #include "vnode.h" #include "vnodeInt.h" -#include "audit.h" static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -177,7 +177,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) { ttlReq.pTbUids = tbUids; } - { // prepare new content + { // prepare new content int32_t reqLenNew = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); int32_t contLenNew = reqLenNew + sizeof(SMsgHead); @@ -262,8 +262,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int now *= 1000000; } - int32_t nlevel = tfsGetLevel(pVnode->pTfs); int32_t keep = pVnode->config.tsdbCfg.keep2; + /* + int32_t nlevel = tfsGetLevel(pVnode->pTfs); if (nlevel > 1 && tsS3Enabled) { if (nlevel == 3) { keep = pVnode->config.tsdbCfg.keep1; @@ -271,7 +272,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int keep = pVnode->config.tsdbCfg.keep0; } } - + */ TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep; TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { @@ -584,7 +585,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } } break; case TDMT_VND_STREAM_TASK_RESET: { - if (pVnode->restored/* && vnodeIsLeader(pVnode)*/) { + if (pVnode->restored /* && vnodeIsLeader(pVnode)*/) { tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; @@ -947,7 +948,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosArrayPush(rsp.pArray, &cRsp); - if(tsEnableAuditCreateTable){ + if (tsEnableAuditCreateTable) { int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; SName name = {0}; @@ -1460,7 +1461,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP); SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); for (int32_t iRow = 0; iRow < nRow; ++iRow) { - if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { code = TSDB_CODE_INVALID_MSG; vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); @@ -1569,7 +1569,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { code = terrno; vError("vgId:%d failed to create table:%s, code:%s", TD_VID(pVnode), pSubmitTbData->pCreateTbReq->name, - tstrerror(terrno)); + tstrerror(terrno)); goto _exit; } terrno = 0; From 4fede02da4b285940ddd8578f1ed485edf632458 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 30 Oct 2023 10:06:42 +0800 Subject: [PATCH 02/25] tsdb/retention: remove s3 evict cache hook --- source/dnode/vnode/src/tsdb/tsdbRetention.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index f2665dcf26..8fb7648c99 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -342,6 +342,7 @@ static int32_t tsdbDoRetention2(void *arg) { code = tsdbMigrateDataFileS3(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); } else { + /* if (tsS3Enabled) { int64_t fsize = 0; if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { @@ -352,7 +353,7 @@ static int32_t tsdbDoRetention2(void *arg) { } s3EvictCache(fobj->fname, fsize * 2); } - + */ code = tsdbDoMigrateFileObj(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); } From 84d2f9c379b5299b874ec969759619bda512f273 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 30 Oct 2023 14:58:22 +0800 Subject: [PATCH 03/25] tsdb/upload: delay data file uploading --- source/common/src/tglobal.c | 25 ++++++++-- source/dnode/vnode/src/inc/vndCos.h | 1 + source/dnode/vnode/src/tsdb/tsdbCommit2.c | 38 ++++++++++++-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 + source/dnode/vnode/src/tsdb/tsdbRetention.c | 49 ++++++++++++------- 5 files changed, 87 insertions(+), 28 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0a155f4ea1..a170e5436e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -120,7 +120,7 @@ char tsSmlTsDefaultName[TSDB_COL_NAME_LEN] = "_ts"; char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value. char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; - // If set to empty system will generate table name using MD5 hash. +// If set to empty system will generate table name using MD5 hash. // true means that the name and order of cols in each line are the same(only for influx protocol) // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; @@ -273,6 +273,7 @@ char tsS3Hostname[TSDB_FQDN_LEN] = ""; int32_t tsS3BlockSize = 4096; // number of tsdb pages int32_t tsS3BlockCacheSize = 16; // number of blocks +int32_t tsS3UploadDelaySec = 60 * 60; int32_t tsCheckpointInterval = 300; @@ -453,7 +454,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1; - if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) return -1; + if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) + return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1; @@ -665,7 +667,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; @@ -688,6 +691,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0) + return -1; // min free disk space used to check if the disk is full [50MB, 1GB] if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, @@ -946,7 +951,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { return -1; } - tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN); + tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, + TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN); @@ -1116,6 +1122,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; + tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32; GRANT_CFG_GET; return 0; @@ -1413,7 +1420,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { } else if (strcasecmp("smlChildTableName", name) == 0) { tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); } else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) { - tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN); + tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, + TSDB_TABLE_NAME_LEN); } else if (strcasecmp("smlTagName", name) == 0) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); // } else if (strcasecmp("smlDataFormat", name) == 0) { @@ -1708,6 +1716,13 @@ void taosCfgDynamicOptions(const char *option, const char *value) { return; } + if (strcasecmp(option, "s3UploadDelaySec") == 0) { + int32_t newS3UploadDelaysec = atoi(value); + uInfo("s3UploadDelaySec set from %d to %d", tsS3UploadDelaySec, newS3UploadDelaysec); + tsS3UploadDelaySec = newS3UploadDelaysec; + return; + } + if (strcasecmp(option, "ttlPushInterval") == 0) { int32_t newTtlPushInterval = atoi(value); uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval); diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index bb4d284f0e..6612b5c653 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -27,6 +27,7 @@ extern "C" { extern int8_t tsS3Enabled; extern int32_t tsS3BlockSize; extern int32_t tsS3BlockCacheSize; +extern int32_t tsS3UploadDelaySec; int32_t s3Init(); void s3CleanUp(); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 6dc492f420..c77c444125 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -46,6 +46,7 @@ typedef struct { STFileSet *fset; TABLEID tbid[1]; bool hasTSData; + bool skipTsRow; } ctx[1]; // reader @@ -127,18 +128,18 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { continue; } } - + /* extern int8_t tsS3Enabled; int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - bool skipRow = false; + committer->ctx->skipTsRow = false; if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) { - skipRow = true; + committer->ctx->skipTsRow = true; } - + */ int64_t ts = TSDBROW_TS(&row->row); - if (skipRow && ts <= committer->ctx->maxKey) { + if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) { ts = committer->ctx->maxKey + 1; } @@ -397,6 +398,33 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { // reset nextKey committer->ctx->nextKey = TSKEY_MAX; + committer->ctx->skipTsRow = false; + + extern int8_t tsS3Enabled; + extern int32_t tsS3UploadDelaySec; + long s3Size(const char *object_name); + int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); + committer->ctx->skipTsRow = false; + if (tsS3Enabled && nlevel > 1 /* && committer->ctx->did.level == nlevel - 1*/) { + STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; + if (committer->ctx->fset && fobj) { + // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay + const char *object_name = taosDirEntryBaseName((char *)fobj->fname); + + if (taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < committer->ctx->now - tsS3UploadDelaySec) { + committer->ctx->skipTsRow = true; + } + } else if (s3Size(object_name) > 0) { + committer->ctx->skipTsRow = true; + } + } + + // new fset can be written with ts data + } + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index f3bcfef703..6904249adb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -131,6 +131,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) { } if (pFD->s3File) { + tsdbWarn("%s file:%d", __func__, pFD->path); return code; } if (pFD->pgno > 0) { @@ -286,6 +287,7 @@ int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; if (pFD->s3File) { + tsdbWarn("%s file:%d", __func__, pFD->path); return code; } code = tsdbWriteFilePage(pFD); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 8fb7648c99..4a5c8300cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -304,7 +304,8 @@ static int32_t tsdbDoRetention2(void *arg) { if (fobj == NULL) continue; int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1 && + !taosCheckExistFile(fobj->fname)) { code = tsdbRemoveFileObjectS3(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); } else { @@ -335,28 +336,40 @@ static int32_t tsdbDoRetention2(void *arg) { for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - if (fobj->f->did.level == did.level) continue; - int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { - code = tsdbMigrateDataFileS3(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - /* - if (tsS3Enabled) { - int64_t fsize = 0; - if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(terrno); - tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__, - fobj->fname, tstrerror(code)); + + if (fobj->f->did.level == did.level) { + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 && + taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < rtner->now - tsS3UploadDelaySec) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); } - s3EvictCache(fobj->fname, fsize * 2); } - */ - code = tsdbDoMigrateFileObj(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); + + continue; } + /* + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + + if (tsS3Enabled) { + int64_t fsize = 0; + if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(terrno); + tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), + __func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit); + } + s3EvictCache(fobj->fname, fsize * 2); + } + */ + code = tsdbDoMigrateFileObj(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + //} } // stt From 800cbfbb056a763ed6ffac35d0a1d98cab6bbf32 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 30 Oct 2023 17:36:49 +0800 Subject: [PATCH 04/25] tsdb/file: new s3flag in mem --- source/dnode/mnode/impl/src/mndDnode.c | 34 ++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 5 ++- source/dnode/vnode/src/tsdb/tsdbFile2.c | 1 + source/dnode/vnode/src/tsdb/tsdbFile2.h | 1 + source/dnode/vnode/src/tsdb/tsdbRetention.c | 2 ++ 5 files changed, 39 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index b53dee7bff..50e5ec46b8 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -720,7 +720,7 @@ static int32_t mndProcessNotifyReq(SRpcMsg *pReq) { mndReleaseVgroup(pMnode, pVgroup); } } - mndUpdClusterInfo(pReq); + mndUpdClusterInfo(pReq); _OVER: tFreeSNotifyReq(¬ifyReq); return code; @@ -1235,6 +1235,38 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "monitor"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) { + int32_t optLen = strlen("s3blockcachesize"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 4 || flag > 1024 * 1024) { + mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]", + cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3blockcachesize"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) { + int32_t optLen = strlen("s3uploaddelaysec"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 600 || flag > 60 * 60 * 24 * 30) { + mError("dnode:%d, failed to config s3UploadDelaySec since value:%d. Valid range: [600, 60 * 60 * 24 * 30]", + cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3uploaddelaysec"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); } else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) { int32_t optLen = strlen("ttlpushinterval"); int32_t flag = -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index c77c444125..5357c52723 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -405,9 +405,9 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { long s3Size(const char *object_name); int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); committer->ctx->skipTsRow = false; - if (tsS3Enabled && nlevel > 1 /* && committer->ctx->did.level == nlevel - 1*/) { + if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) { STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; - if (committer->ctx->fset && fobj) { + if (fobj && fobj->f->did.level == nlevel - 1) { // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay const char *object_name = taosDirEntryBaseName((char *)fobj->fname); @@ -421,7 +421,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { committer->ctx->skipTsRow = true; } } - // new fset can be written with ts data } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index 963c5bad34..9edb03d35b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -303,6 +303,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) { if (f1->did.id != f2->did.id) return false; if (f1->fid != f2->fid) return false; if (f1->cid != f2->cid) return false; + if (f1->s3flag != f2->s3flag) return false; return true; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.h b/source/dnode/vnode/src/tsdb/tsdbFile2.h index 33d8ac5478..9da198c1f0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.h @@ -58,6 +58,7 @@ int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2); struct STFile { tsdb_ftype_t type; SDiskID did; // disk id + int32_t s3flag; int32_t fid; // file id int64_t cid; // commit id int64_t size; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 4a5c8300cd..cfeca531fc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -211,6 +211,8 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, const }, }; + op.nf.s3flag = true; + code = TARRAY2_APPEND(rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); From 867ce92dcc9f0c8e58591183a0672c442e3ec9ff Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 30 Oct 2023 19:28:18 +0800 Subject: [PATCH 05/25] tsdb/file-rw: fix log msg --- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 6904249adb..65f0f1b94d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -131,7 +131,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) { } if (pFD->s3File) { - tsdbWarn("%s file:%d", __func__, pFD->path); + tsdbWarn("%s file: %s", __func__, pFD->path); return code; } if (pFD->pgno > 0) { @@ -287,7 +287,7 @@ int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; if (pFD->s3File) { - tsdbWarn("%s file:%d", __func__, pFD->path); + tsdbWarn("%s file: %s", __func__, pFD->path); return code; } code = tsdbWriteFilePage(pFD); From a61502411df454c474ff2f70aadb4d311fc970bc Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 1 Nov 2023 10:49:05 +0800 Subject: [PATCH 06/25] tsdb/pg-cache: new page cache for tsdb s3 read file page --- source/dnode/vnode/src/inc/tsdb.h | 16 ++- source/dnode/vnode/src/tsdb/tsdbCache.c | 79 +++++++++++- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 117 +++++++++++++++++- 3 files changed, 200 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 79112babc3..857aef78dc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -382,6 +382,8 @@ struct STsdb { TdThreadMutex biMutex; SLRUCache *bCache; TdThreadMutex bMutex; + SLRUCache *pgCache; + TdThreadMutex pgMutex; struct STFileSystem *pFS; // new SRocksCache rCache; }; @@ -677,9 +679,9 @@ typedef struct STSnapRange STSnapRange; typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges // util -int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); +int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); +int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); +void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges); // snap partition list @@ -873,8 +875,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); -void tMergeTreePinSttBlock(SMergeTree* pMTree); -void tMergeTreeUnpinSttBlock(SMergeTree* pMTree); +void tMergeTreePinSttBlock(SMergeTree *pMTree); +void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); @@ -909,7 +911,9 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle); -int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle); +int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ca3fb7027f..7cea469c34 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -87,6 +87,41 @@ static void tsdbCloseBCache(STsdb *pTsdb) { } } +static int32_t tsdbOpenPgCache(STsdb *pTsdb) { + int32_t code = 0; + // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + + SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szPage, 0, .5); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + taosLRUCacheSetStrictCapacity(pCache, false); + + taosThreadMutexInit(&pTsdb->pgMutex, NULL); + +_err: + pTsdb->pgCache = pCache; + return code; +} + +static void tsdbClosePgCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->pgCache; + if (pCache) { + int32_t elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + taosLRUCacheEraseUnrefEntries(pCache); + elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + + taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->bMutex); + } +} + #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) typedef struct { @@ -1191,6 +1226,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenPgCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tsdbOpenRocksCache(pTsdb); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1221,6 +1262,7 @@ void tsdbCloseCache(STsdb *pTsdb) { tsdbCloseBICache(pTsdb); tsdbCloseBCache(pTsdb); + tsdbClosePgCache(pTsdb); tsdbCloseRocksCache(pTsdb); } @@ -3057,7 +3099,6 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { } */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - // int64_t size = 4096; code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); if (code != TSDB_CODE_SUCCESS) { // taosMemoryFree(pBlock); @@ -3123,10 +3164,42 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) return code; } -int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) { +int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) { int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; - taosLRUCacheRelease(pCache, h, false); + getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen); + *handle = taosLRUCacheLookup(pCache, key, keyLen); + + return code; +} + +int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) { + int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; + LRUHandle *handle = NULL; + + getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen); + taosThreadMutexLock(&pFD->pTsdb->pgMutex); + handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen); + if (!handle) { + size_t charge = pFD->szPage; + _taos_lru_deleter_t deleter = deleteBCache; + uint8_t *pPg = taosMemoryMalloc(charge); + memcpy(pPg, pPage, charge); + + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL); + if (status != TAOS_LRU_STATUS_OK) { + // ignore cache updating if not ok + // code = TSDB_CODE_OUT_OF_MEMORY; + } + } + taosThreadMutexUnlock(&pFD->pTsdb->pgMutex); + + tsdbCacheRelease(pFD->pTsdb->pgCache, handle); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 65f0f1b94d..def9a73d10 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -178,7 +178,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize; code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle); if (code != TSDB_CODE_SUCCESS || handle == NULL) { - tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + tsdbCacheRelease(pFD->pTsdb->bCache, handle); if (code == TSDB_CODE_SUCCESS && !handle) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -190,7 +190,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage); - tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + tsdbCacheRelease(pFD->pTsdb->bCache, handle); } else { // seek int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); @@ -254,7 +254,7 @@ _exit: return code; } -int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { +static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t n = 0; int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); @@ -283,6 +283,117 @@ _exit: return code; } +static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { + int32_t code = 0; + int64_t n = 0; + int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage); + int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); + int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); + int64_t bOffset = fOffset % pFD->szPage; + + ASSERT(bOffset < szPgCont); + + // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch + // 2, fetch pgnoStart ~ pgnoEnd from s3 + // 3, store pgs to pcache & last pg to pFD->pBuf + // 4, deliver pgs to [pBuf, pBuf + size) + + while (n < size) { + if (pFD->pgno != pgno) { + LRUHandle *handle = NULL; + code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle); + if (code != TSDB_CODE_SUCCESS) { + if (handle) { + tsdbCacheRelease(pFD->pTsdb->pgCache, handle); + } + goto _exit; + } + + if (!handle) { + break; + } + + uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle); + memcpy(pFD->pBuf, pPage, pFD->szPage); + tsdbCacheRelease(pFD->pTsdb->pgCache, handle); + + // check + if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pFD->pgno = pgno; + } + + int64_t nRead = TMIN(szPgCont - bOffset, size - n); + memcpy(pBuf + n, pFD->pBuf + bOffset, nRead); + + n += nRead; + pgno++; + bOffset = 0; + } + + if (n < size) { + // 2, retrieve pgs from s3 + uint8_t *pBlock = NULL; + int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage); + int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont; + int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage; + code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + // 3, Store Pages in Cache + int nPage = pgnoEnd - pgno + 1; + for (int i = 0; i < nPage; ++i) { + tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage); + + memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage); + + // check + if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pFD->pgno = pgno; + + int64_t nRead = TMIN(szPgCont - bOffset, size - n); + memcpy(pBuf + n, pFD->pBuf + bOffset, nRead); + + n += nRead; + pgno++; + bOffset = 0; + } + + taosMemoryFree(pBlock); + } + +_exit: + return code; +} + +int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { + int32_t code = 0; + if (!pFD->pFD) { + code = tsdbOpenFileImpl(pFD); + if (code) { + goto _exit; + } + } + + if (pFD->s3File && tsS3BlockSize < 0) { + return tsdbReadFileS3(pFD, offset, pBuf, size); + } else { + return tsdbReadFileImp(pFD, offset, pBuf, size); + } + +_exit: + return code; +} + int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; From 221afff9eefdbde51126cc080375b356c092145c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 1 Nov 2023 11:00:47 +0800 Subject: [PATCH 07/25] vnode/write: remove keep modify code block --- source/dnode/vnode/src/vnd/vnodeSvr.c | 46 +++++++++++++-------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b3675380c7..b31463ac00 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,12 +16,11 @@ #include "audit.h" #include "tencode.h" #include "tmsg.h" +#include "tstrbuild.h" #include "vnd.h" #include "vndCos.h" #include "vnode.h" #include "vnodeInt.h" -#include "audit.h" -#include "tstrbuild.h" static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -275,6 +274,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int } } */ + TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep; TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { @@ -906,7 +906,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp)); tbUids = taosArrayInit(req.nReqs, sizeof(int64_t)); - tbNames = taosArrayInit(req.nReqs, sizeof(char*)); + tbNames = taosArrayInit(req.nReqs, sizeof(char *)); if (rsp.pArray == NULL || tbUids == NULL || tbNames == NULL) { rcode = -1; terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -952,8 +952,8 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosArrayPush(rsp.pArray, &cRsp); - if(tsEnableAuditCreateTable){ - char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + if (tsEnableAuditCreateTable) { + char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); strcpy(str, pCreateReq->name); taosArrayPush(tbNames, &str); } @@ -978,24 +978,24 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen); tEncodeSVCreateTbBatchRsp(&encoder, &rsp); - if(tsEnableAuditCreateTable){ + if (tsEnableAuditCreateTable) { int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; SName name = {0}; tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB); SStringBuilder sb = {0}; - for(int32_t iReq = 0; iReq < req.nReqs; iReq++){ - char** key = (char**)taosArrayGet(tbNames, iReq); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + char **key = (char **)taosArrayGet(tbNames, iReq); taosStringBuilderAppendStringLen(&sb, *key, strlen(*key)); - if(iReq < req.nReqs - 1){ + if (iReq < req.nReqs - 1) { taosStringBuilderAppendChar(&sb, ','); } taosMemoryFreeClear(*key); } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); + size_t len = 0; + char *keyJoined = taosStringBuilderGetResult(&sb, &len); auditRecord(NULL, clusterId, "createTable", name.dbname, "", keyJoined, len); @@ -1007,7 +1007,7 @@ _exit: pCreateReq = req.pReqs + iReq; taosMemoryFree(pCreateReq->sql); taosMemoryFree(pCreateReq->comment); - taosArrayDestroy(pCreateReq->ctb.tagName); + taosArrayDestroy(pCreateReq->ctb.tagName); } taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp); taosArrayDestroy(tbUids); @@ -1166,7 +1166,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in // process req tbUids = taosArrayInit(req.nReqs, sizeof(int64_t)); rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp)); - tbNames = taosArrayInit(req.nReqs, sizeof(char*)); + tbNames = taosArrayInit(req.nReqs, sizeof(char *)); if (tbUids == NULL || rsp.pArray == NULL || tbNames == NULL) goto _exit; for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { @@ -1188,9 +1188,9 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in } taosArrayPush(rsp.pArray, &dropTbRsp); - - if(tsEnableAuditCreateTable){ - char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + + if (tsEnableAuditCreateTable) { + char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); strcpy(str, pDropTbReq->name); taosArrayPush(tbNames, &str); } @@ -1199,30 +1199,30 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in tqUpdateTbUidList(pVnode->pTq, tbUids, false); tdUpdateTbUidList(pVnode->pSma, pStore, false); - if(tsEnableAuditCreateTable){ + if (tsEnableAuditCreateTable) { int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; SName name = {0}; tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB); SStringBuilder sb = {0}; - for(int32_t iReq = 0; iReq < req.nReqs; iReq++){ - char** key = (char**)taosArrayGet(tbNames, iReq); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + char **key = (char **)taosArrayGet(tbNames, iReq); taosStringBuilderAppendStringLen(&sb, *key, strlen(*key)); - if(iReq < req.nReqs - 1){ + if (iReq < req.nReqs - 1) { taosStringBuilderAppendChar(&sb, ','); } taosMemoryFreeClear(*key); } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); + size_t len = 0; + char *keyJoined = taosStringBuilderGetResult(&sb, &len); auditRecord(NULL, clusterId, "dropTable", name.dbname, "", keyJoined, len); taosStringBuilderDestroy(&sb); } - + _exit: taosArrayDestroy(tbUids); tdUidStoreFree(pStore); From 0f5a124cb1a43fa7d1ba17fa91a3cc43cc20bbfc Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 1 Nov 2023 11:08:39 +0800 Subject: [PATCH 08/25] config/block-size: enable dynamic config for debugging --- source/common/src/tglobal.c | 13 +++++++------ source/dnode/mnode/impl/src/mndDnode.c | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3d1a1ccb16..7cf1586abb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -105,9 +105,9 @@ bool tsEnableAuditCreateTable = true; // telem #ifdef TD_ENTERPRISE -bool tsEnableTelem = false; +bool tsEnableTelem = false; #else -bool tsEnableTelem = true; +bool tsEnableTelem = true; #endif int32_t tsTelemInterval = 43200; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; @@ -115,9 +115,9 @@ uint16_t tsTelemPort = 80; char *tsTelemUri = "/report"; #ifdef TD_ENTERPRISE -bool tsEnableCrashReport = false; +bool tsEnableCrashReport = false; #else -bool tsEnableCrashReport = true; +bool tsEnableCrashReport = true; #endif char *tsClientCrashReportUri = "/ccrashreport"; char *tsSvrCrashReportUri = "/dcrashreport"; @@ -677,7 +677,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1; @@ -695,7 +696,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 6d77a48110..3b3e9eb03b 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1228,6 +1228,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "monitor"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) { + int32_t optLen = strlen("s3blocksize"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag > 1024 * 1024) { + mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId, + flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3blocksize"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); } else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) { int32_t optLen = strlen("s3blockcachesize"); int32_t flag = -1; From 6f0fa5d47e99d1ffe16f1a5aa45053ce23700e8b Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 1 Nov 2023 14:56:57 +0800 Subject: [PATCH 09/25] tsdb/retention: remove incorrect right brace --- source/dnode/vnode/src/tsdb/tsdbRetention.c | 59 ++++++++++----------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 40561ddb42..0fc1e1b64b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -324,41 +324,40 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); + int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (fobj->f->did.level == did.level) { - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 && - taosCheckExistFile(fobj->fname)) { - int32_t mtime = 0; - taosStatFile(fobj->fname, NULL, &mtime, NULL); - if (mtime < rtner->now - tsS3UploadDelaySec) { - code = tsdbMigrateDataFileS3(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } + if (fobj->f->did.level == did.level) { + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 && + taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < rtner->now - tsS3UploadDelaySec) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); } - - continue; } - /* - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { - code = tsdbMigrateDataFileS3(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if (tsS3Enabled) { - int64_t fsize = 0; - if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(terrno); - tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), - __func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit); - } - s3EvictCache(fobj->fname, fsize * 2); - } - */ - code = tsdbDoMigrateFileObj(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - //} + continue; } + /* + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + + if (tsS3Enabled) { + int64_t fsize = 0; + if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(terrno); + tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), + __func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit); + } + s3EvictCache(fobj->fname, fsize * 2); + } + */ + code = tsdbDoMigrateFileObj(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + //} } // stt From 52fd71ce13f07ba1788893b228b562e5191d4352 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 1 Nov 2023 18:00:31 +0800 Subject: [PATCH 10/25] config/block-size: default to -1 to enable page cache --- source/common/src/tglobal.c | 7 ++++--- source/dnode/mnode/impl/src/mndDnode.c | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7cf1586abb..485b67e1a7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -280,8 +280,8 @@ int8_t tsS3Enabled = false; int8_t tsS3Https = true; char tsS3Hostname[TSDB_FQDN_LEN] = ""; -int32_t tsS3BlockSize = 4096; // number of tsdb pages -int32_t tsS3BlockCacheSize = 16; // number of blocks +int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) +int32_t tsS3BlockCacheSize = 1024; // number of blocks/pages (16) int32_t tsS3UploadDelaySec = 60 * 60; #ifndef _STORAGE @@ -697,7 +697,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER) != 0) + return -1; if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 3b3e9eb03b..e2c21ed012 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1250,7 +1250,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); if (code < 0) return code; - if (flag < 4 || flag > 1024 * 1024) { + if (flag < 4 || flag > 1024 * 1024 * 1024) { mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId, flag); terrno = TSDB_CODE_INVALID_CFG; From b71a013b8d3627eee126f6a61d7b591bd7f3e252 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 1 Nov 2023 20:24:24 +0800 Subject: [PATCH 11/25] fix: nano seconds database error --- docs/en/12-taos-sql/10-function.md | 6 ++- docs/zh/12-taos-sql/10-function.md | 4 +- include/util/taoserror.h | 3 +- source/common/src/ttime.c | 2 +- source/common/test/commonTests.cpp | 2 +- source/libs/function/src/builtins.c | 2 +- source/libs/scalar/src/sclfunc.c | 2 +- source/util/src/terror.c | 3 +- .../2-query/func_to_char_timestamp.py | 38 +++++++++++++++++++ 9 files changed, 53 insertions(+), 9 deletions(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 18c7ffc345..5be14093de 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -539,7 +539,8 @@ TO_CHAR(ts, format_str_literal) - When `ms`,`us`,`ns` are used in `to_char`, like `to_char(ts, 'yyyy-mm-dd hh:mi:ss.ms.us.ns')`, The time of `ms`,`us`,`ns` corresponds to the same fraction seconds. When ts is `1697182085123`, the output of `ms` is `123`, `us` is `123000`, `ns` is `123000000`. - If we want to output some characters of format without converting, surround it with double quotes. `to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. If want to output double quotes, add a back slash before double quote, like `to_char(ts, '\"yyyy-mm-dd\"')` will output `"2023-10-10"`. - For formats that output digits, the uppercase and lowercase formats are the same. -- It's recommended to put time zone in the format, if not, the default time zone zone will be that in server or client. +- It's recommended to put time zone in the format, if not, the default time zone will be that in server or client. +- The precision of the input timestamp will be recognized automatically according to the precision of the table used. #### TO_TIMESTAMP @@ -564,9 +565,10 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal) - The uppercase or lowercase of `MONTH`, `MON`, `DAY`, `DY` and formtas that output digits have same effect when used in `to_timestamp`, like `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month` can be replaced by `MONTH`, or `month`. The cases are ignored. - If multi times are specified for one component, the previous will be overwritten. Like `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, the output year will be `2022`. - To avoid unexpected time zone used during the convertion, it's recommended to put time zone in the ts string, e.g. '2023-10-10 10:10:10+08'. If time zone not specified, default will be that in server or client. -- The default timestamp if some components are not specified will be: `1970-01-01 00:00:00` with specified or default local timezone. +- The default timestamp if some components are not specified will be: `1970-01-01 00:00:00` with the timezone specified or default to local timezone. - If `AM` or `PM` is specified in formats, the Hour must between `1-12`. - In some cases, `to_timestamp` can convert correctly even the format and the timestamp string are not totally matched. Like `to_timetamp('200101/2', 'yyyyMM1/dd')`, the digit `1` in format string are ignored, and the output timestsamp is `2001-01-02 00:00:00`. Spaces and tabs in formats and tiemstamp string are also ignored automatically. +- The precision of the output timestamp will be the same as the table in SELECT stmt, millisecond will be used if no table is specified. The output of `select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')` will be truncated to millisecond precision. If a nano precision table is specified, no truncation will be applied. Like `select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`. ### Time and Date Functions diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 4371124623..723f299cbc 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -540,6 +540,7 @@ TO_CHAR(ts, format_str_literal) - 时间格式中无法匹配规则的内容会直接输出. 如果想要在格式串中指定某些能够匹配规则的部分不做转换, 可以使用双引号, 如`to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. 如果想要输出双引号, 那么在双引号之前加一个反斜杠, 如 `to_char(ts, '\"yyyy-mm-dd\"')` 将会输出 `"2023-10-10"`. - 那些输出是数字的格式, 如`YYYY`, `DD`, 大写与小写意义相同, 即`yyyy` 和 `YYYY` 可以互换. - 推荐在时间格式中带时区信息,如果不带则默认输出的时区为服务端或客户端所配置的时区. +- 输入时间戳的精度由所查询表的精度确定. #### TO_TIMESTAMP @@ -560,13 +561,14 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal) **支持的格式**: 与`to_char`相同 **使用说明**: -- 若`ms`, `us`, `ns`同时指定, 那么结果时间戳包含上述三个字段的和. 如 `to_timestamp('2023-10-10 10:10:10.123.000456.000000789', 'yyyy-mm-dd hh:mi:ss.ms.us.ns')` 输出是 `2023-10-10 10:10:10.123456789`. +- 若`ms`, `us`, `ns`同时指定, 那么结果时间戳包含上述三个字段的和. 如 `to_timestamp('2023-10-10 10:10:10.123.000456.000000789', 'yyyy-mm-dd hh:mi:ss.ms.us.ns')` 输出为 `2023-10-10 10:10:10.123456789`对应的时间戳. - `MONTH`, `MON`, `DAY`, `DY` 以及其他输出为数字的格式的大小写意义相同, 如 `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month`可以被替换为`MONTH` 或者`Month`. - 如果同一字段被指定了多次, 那么前面的指定将会被覆盖. 如 `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, 输出年份是`2022`. - 为避免转换时使用了非预期的时区,推荐在时间中携带时区信息,例如'2023-10-10 10:10:10+08',如果未指定时区则默认时区为服务端或客户端指定的时区。 - 如果没有指定完整的时间,那么默认时间值为指定或默认时区的 `1970-01-01 00:00:00`, 未指定部分使用该默认值中的对应部分. - 如果格式串中有`AM`, `PM`等, 那么小时必须是12小时制, 范围必须是01-12. - `to_timestamp`转换具有一定的容错机制, 在格式串和时间戳串不完全对应时, 有时也可转换, 如: `to_timestamp('200101/2', 'yyyyMM1/dd')`, 格式串中多出来的1会被丢弃. 格式串与时间戳串中多余的空格字符(空格, tab等)也会被 自动忽略. 如`to_timestamp(' 23 年 - 1 月 - 01 日 ', 'yy 年-MM月-dd日')` 可以被成功转换. 虽然`MM`等字段需要两个数字对应(只有一位时前面补0), 在`to_timestamp`时, 一个数字也可以成功转换. +- 输出时间戳的精度与查询表的精度相同, 若查询未指定表, 则输出精度为毫秒. 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')`的输出将会把微妙和纳秒进行截断. 如果指定一张纳秒表, 那么就不会发生截断, 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`. ### 时间和日期函数 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8748ea99a2..00009bf6da 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -741,7 +741,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803) #define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804) #define TSDB_CODE_FUNC_DUP_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x2805) -#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED TAOS_DEF_ERROR_CODE(0, 0x2806) +#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR TAOS_DEF_ERROR_CODE(0, 0x2806) +#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR TAOS_DEF_ERROR_CODE(0, 0x2807) //udf #define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 6b5bb8680e..4b0848e5e9 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -1320,7 +1320,7 @@ static void tm2char(const SArray* formats, const struct STm* tm, char* s, int32_ s += 4; break; case TSFKW_DDD: - sprintf(s, "%d", tm->tm.tm_yday); + sprintf(s, "%03d", tm->tm.tm_yday + 1); s += strlen(s); break; case TSFKW_DD: diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index dc320ebcb2..c65d8761b7 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -344,7 +344,7 @@ TEST(timeTest, ts2char) { "day-\"日\"", TSDB_TIME_PRECISION_MILLI, "2023-023-23-3-2023-023-23-3-年-OCTOBER -OCT-October -Oct-october " - "-oct-月-285-13-6-285-13-6-FRIDAY -Friday -friday -日"); + "-oct-月-286-13-6-286-13-6-FRIDAY -Friday -friday -日"); #endif ts = 1697182085123L; // Friday, October 13, 2023 3:28:05.123 PM GMT+08:00 test_ts2char(ts, "HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI, diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 84aff9fa88..fdbc0b4038 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3315,7 +3315,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "to_timestamp", .type = FUNCTION_TYPE_TO_TIMESTAMP, - .classification = FUNC_MGT_SCALAR_FUNC, + .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, .translateFunc = translateToTimestamp, .getEnvFunc = NULL, .initFunc = NULL, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 48886b1eec..dbdd79cc65 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1230,7 +1230,7 @@ int32_t toTimestampFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam code = taosChar2Ts(format, &formats, tsStr, &ts, precision, errMsg, 128); if (code) { qError("func to_timestamp failed %s", errMsg); - code = TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED; + code = code == -1 ? TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR : TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR; break; } colDataSetVal(pOutput->columnData, i, (char *)&ts, false); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 30daad62bf..0ffa77e365 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -604,7 +604,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid function par TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid function para value") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION, "Not buildin function") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_DUP_TIMESTAMP, "Duplicate timestamps not allowed in function") -TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED, "Func to_timestamp failed, check log for detail") +TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR, "Func to_timestamp failed, format mismatch") +TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR, "Func to_timestamp failed, wrong timestamp") //udf TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping") diff --git a/tests/system-test/2-query/func_to_char_timestamp.py b/tests/system-test/2-query/func_to_char_timestamp.py index 639811d275..d955e00a82 100644 --- a/tests/system-test/2-query/func_to_char_timestamp.py +++ b/tests/system-test/2-query/func_to_char_timestamp.py @@ -166,6 +166,44 @@ class TDTestCase: def run(self): self.prepareTestEnv() self.test_to_timestamp() + self.test_ns_to_timestamp() + + def create_tables(self): + tdSql.execute("create database if not exists test_us precision 'us'") + tdSql.execute("create database if not exists test_ns precision 'ns'") + tdSql.execute("use test_us") + tdSql.execute(f"CREATE STABLE `meters_us` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT, `ts2` timestamp) TAGS (`t1` INT)") + tdSql.execute(f"CREATE TABLE `ctb1_us` USING `meters_us` (`t1`) TAGS (1)") + tdSql.execute(f"CREATE TABLE `ctb2_us` USING `meters_us` (`t1`) TAGS (2)") + tdSql.execute("use test_ns") + tdSql.execute(f"CREATE STABLE `meters_ns` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT, `ts2` timestamp) TAGS (`t1` INT)") + tdSql.execute(f"CREATE TABLE `ctb1_ns` USING `meters_ns` (`t1`) TAGS (1)") + tdSql.execute(f"CREATE TABLE `ctb2_ns` USING `meters_ns` (`t1`) TAGS (2)") + + def insert_ns_data(self): + tdLog.debug("start to insert data ............") + tdSql.execute(f"INSERT INTO `test_us`.`ctb1_us` VALUES ('2023-07-01 00:00:00.123456', 10.30000, 100, '2023-07-01 00:00:00.123456')") + tdSql.execute(f"INSERT INTO `test_us`.`ctb2_us` VALUES ('2023-08-01 00:00:00.123456', 20.30000, 200, '2023-07-01 00:00:00.123456')") + tdSql.execute(f"INSERT INTO `test_ns`.`ctb1_ns` VALUES ('2023-07-01 00:00:00.123456789', 10.30000, 100, '2023-07-01 00:00:00.123456000')") + tdSql.execute(f"INSERT INTO `test_ns`.`ctb2_ns` VALUES ('2023-08-01 00:00:00.123456789', 20.30000, 200, '2023-08-01 00:00:00.123456789')") + tdLog.debug("insert data ............ [OK]") + + def test_ns_to_timestamp(self): + self.create_tables() + self.insert_ns_data() + tdSql.query("select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')", queryTimes=1) + tdSql.checkData(0, 0, 1690855810123) + tdSql.execute('use test_ns', queryTimes=1) + tdSql.query("select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')", queryTimes=1) + tdSql.checkData(0, 0, 1690855810123) + tdSql.query("select to_char(ts2, 'yyyy-mm-dd hh:mi:ss.ns') from meters_ns", queryTimes=1) + tdSql.checkData(0, 0, '2023-07-01 12:00:00.123456000') + tdSql.checkData(1, 0, '2023-08-01 12:00:00.123456789') + + tdSql.query("select to_timestamp(to_char(ts2, 'yyyy-mm-dd hh:mi:ss.ns'), 'yyyy-mm-dd hh:mi:ss.ns') from meters_ns", queryTimes=1) + tdSql.checkData(0, 0, 1688140800123456000) + tdSql.checkData(1, 0, 1690819200123456789) + def stop(self): tdSql.close() From d0327e07b833bb3086a294ad44ba562e82e7c18f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 2 Nov 2023 09:42:19 +0800 Subject: [PATCH 12/25] tsdb/pg-cache: use pgno for cache key instead of blkno --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7cea469c34..94e6152487 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3169,7 +3169,7 @@ int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHan char key[128] = {0}; int keyLen = 0; - getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen); + getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen); *handle = taosLRUCacheLookup(pCache, key, keyLen); return code; From 46d3be1f08eeebee07e6d3c53eb05cbbc649e4e6 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 2 Nov 2023 10:10:03 +0800 Subject: [PATCH 13/25] config/page-cache-size: new parameter for pcache size --- source/common/src/tglobal.c | 17 +++++++++++++---- source/dnode/mnode/impl/src/mndDnode.c | 18 +++++++++++++++++- source/dnode/vnode/src/inc/vndCos.h | 1 + source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 485b67e1a7..3c16554ac6 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -280,8 +280,9 @@ int8_t tsS3Enabled = false; int8_t tsS3Https = true; char tsS3Hostname[TSDB_FQDN_LEN] = ""; -int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) -int32_t tsS3BlockCacheSize = 1024; // number of blocks/pages (16) +int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) +int32_t tsS3BlockCacheSize = 16; // number of blocks +int32_t tsS3PageCacheSize = 1024; // number of pages int32_t tsS3UploadDelaySec = 60 * 60; #ifndef _STORAGE @@ -697,8 +698,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER) != 0) - return -1; + if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0) return -1; @@ -1132,6 +1133,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; + tsS3PageCacheSize = cfgGetItem(pCfg, "s3PageCacheSize")->i32; tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32; GRANT_CFG_GET; @@ -1726,6 +1728,13 @@ void taosCfgDynamicOptions(const char *option, const char *value) { return; } + if (strcasecmp(option, "s3PageCacheSize") == 0) { + int32_t newS3PageCacheSize = atoi(value); + uInfo("s3PageCacheSize set from %d to %d", tsS3PageCacheSize, newS3PageCacheSize); + tsS3PageCacheSize = newS3PageCacheSize; + return; + } + if (strcasecmp(option, "s3UploadDelaySec") == 0) { int32_t newS3UploadDelaysec = atoi(value); uInfo("s3UploadDelaySec set from %d to %d", tsS3UploadDelaySec, newS3UploadDelaysec); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e2c21ed012..46f7c6c3e2 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1250,7 +1250,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); if (code < 0) return code; - if (flag < 4 || flag > 1024 * 1024 * 1024) { + if (flag < 4 || flag > 1024 * 1024) { mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId, flag); terrno = TSDB_CODE_INVALID_CFG; @@ -1260,6 +1260,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "s3blockcachesize"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "s3pagecachesize", 16) == 0) { + int32_t optLen = strlen("s3pagecachesize"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 4 || flag > 1024 * 1024 * 1024) { + mError("dnode:%d, failed to config s3PageCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId, + flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3pagecachesize"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); } else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) { int32_t optLen = strlen("s3uploaddelaysec"); int32_t flag = -1; diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 6612b5c653..8581b039f8 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -27,6 +27,7 @@ extern "C" { extern int8_t tsS3Enabled; extern int32_t tsS3BlockSize; extern int32_t tsS3BlockCacheSize; +extern int32_t tsS3PageCacheSize; extern int32_t tsS3UploadDelaySec; int32_t s3Init(); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 94e6152487..bf6f0cf4d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -92,7 +92,7 @@ static int32_t tsdbOpenPgCache(STsdb *pTsdb) { // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; - SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szPage, 0, .5); + SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5); if (pCache == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; From 1cd12845202a6d3d1272f88c70aa3726a6234423 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 2 Nov 2023 12:43:06 +0800 Subject: [PATCH 14/25] config/pcache: default page cache to 4096 --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3c16554ac6..8e824b4d67 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -282,7 +282,7 @@ char tsS3Hostname[TSDB_FQDN_LEN] = ""; int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) int32_t tsS3BlockCacheSize = 16; // number of blocks -int32_t tsS3PageCacheSize = 1024; // number of pages +int32_t tsS3PageCacheSize = 4096; // number of pages int32_t tsS3UploadDelaySec = 60 * 60; #ifndef _STORAGE From 159873692cfedac7bc855bd1eb3f6fa2575a0c5a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 2 Nov 2023 20:13:29 +0800 Subject: [PATCH 15/25] feat:checkpoint bakeup using rsync --- include/common/tglobal.h | 4 + .../src/inc/vndCos.h => include/util/cos.h | 3 +- include/util/rsync.h | 24 +++ source/common/src/tglobal.c | 10 +- source/dnode/snode/src/snode.c | 4 + source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbFS2.c | 2 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRetention.c | 3 +- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/src/vnd/vnodeModule.c | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/inc/streamInt.h | 11 ++ source/libs/stream/src/streamCheckpoint.c | 132 +++++++++++++ source/libs/stream/test/CMakeLists.txt | 11 ++ source/libs/stream/test/checkpointTest.cpp | 61 ++++++ .../src/vnd/vnodeCos.c => util/src/cos.c} | 2 +- source/util/src/rsync.c | 178 ++++++++++++++++++ 20 files changed, 445 insertions(+), 13 deletions(-) rename source/dnode/vnode/src/inc/vndCos.h => include/util/cos.h (96%) create mode 100644 include/util/rsync.h create mode 100644 source/libs/stream/test/checkpointTest.cpp rename source/{dnode/vnode/src/vnd/vnodeCos.c => util/src/cos.c} (99%) create mode 100644 source/util/src/rsync.c diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 3e29703070..ccae8f02e2 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -82,6 +82,10 @@ extern int32_t tsHeartbeatTimeout; // vnode extern int64_t tsVndCommitMaxIntervalMs; +// snode +extern char tsSnodeIp[]; +extern char tsCheckpointBackupDir[]; + // mnode extern int64_t tsMndSdbWriteDelta; extern int64_t tsMndLogRetention; diff --git a/source/dnode/vnode/src/inc/vndCos.h b/include/util/cos.h similarity index 96% rename from source/dnode/vnode/src/inc/vndCos.h rename to include/util/cos.h index bb4d284f0e..9b91afea8d 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/include/util/cos.h @@ -16,7 +16,7 @@ #ifndef _TD_VND_COS_H_ #define _TD_VND_COS_H_ -#include "vnd.h" +#include "os.h" #ifdef __cplusplus extern "C" { @@ -24,6 +24,7 @@ extern "C" { #define S3_BLOCK_CACHE +extern int8_t tsS3StreamEnabled; extern int8_t tsS3Enabled; extern int32_t tsS3BlockSize; extern int32_t tsS3BlockCacheSize; diff --git a/include/util/rsync.h b/include/util/rsync.h new file mode 100644 index 0000000000..50b27b95e0 --- /dev/null +++ b/include/util/rsync.h @@ -0,0 +1,24 @@ +// +// Created by mingming wanng on 2023/11/2. +// + +#ifndef TDENGINE_RSYNC_H +#define TDENGINE_RSYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tarray.h" + +void stopRsync(); +void startRsync(); +int uploadRsync(char* id, SArray* fileList); +int downloadRsync(char* id, char* path); +int deleteRsync(char* id); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RSYNC_H diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c6cff27011..241481df6a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -133,6 +133,10 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; +// checkpoint backup +char tsSnodeIp[TSDB_FQDN_LEN] = {0}; +char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; + // tmq int32_t tmqMaxTopicNum = 20; // query @@ -275,6 +279,7 @@ char tsS3AccessKeySecret[TSDB_FQDN_LEN] = ""; char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int8_t tsS3StreamEnabled = false; int8_t tsS3Https = true; char tsS3Hostname[TSDB_FQDN_LEN] = ""; @@ -338,9 +343,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN); } } - if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) { + if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) - tsS3Enabled = true; + if(tsDiskCfgNum > 1) tsS3Enabled = true; + tsS3StreamEnabled = true; #endif } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6451dba2da..9a6e51db31 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "rsync.h" #include "executor.h" #include "sndInt.h" #include "tstream.h" @@ -120,6 +121,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } + stopRsync(); + startRsync(); + // todo fix it: send msg to mnode to rollback to an existed checkpoint streamMetaInitForSnode(pSnode->pMeta); return pSnode; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index dcc9f9a115..27cb0f93f7 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -8,7 +8,6 @@ set( "src/vnd/vnodeCommit.c" "src/vnd/vnodeQuery.c" "src/vnd/vnodeModule.c" - "src/vnd/vnodeCos.c" "src/vnd/vnodeSvr.c" "src/vnd/vnodeSync.c" "src/vnd/vnodeSnapshot.c" diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ca3fb7027f..1e73fad8f6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -12,11 +12,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" #include "vnd.h" -#include "vndCos.h" #define ROCKS_BATCH_SIZE (4096) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 38d221d978..348397272d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -14,9 +14,9 @@ */ #include "tsdbFS2.h" +#include "cos.h" #include "tsdbUpgrade.h" #include "vnd.h" -#include "vndCos.h" #define BLOCK_COMMIT_FACTOR 3 diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index f3bcfef703..cc15fb85ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" -#include "vndCos.h" static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 6c41b46c73..2400c41d52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -15,7 +15,8 @@ #include "tsdb.h" #include "tsdbFS2.h" -#include "vndCos.h" +#include "cos.h" +#include "vnd.h" typedef struct { STsdb *tsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 5949b103d5..e75079403e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" -#include "vndCos.h" /** * @brief max key by precision diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 6ccce5c9d7..df08fb8a2b 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "vnd.h" -#include "vndCos.h" typedef struct SVnodeTask SVnodeTask; struct SVnodeTask { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3bdecee79b..ff79e83d72 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ +#include "cos.h" #include "sync.h" #include "tsdb.h" #include "vnd.h" -#include "vndCos.h" int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { if (pTfs) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b6a4aaf388..8f6f5df850 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,7 +16,7 @@ #include "tencode.h" #include "tmsg.h" #include "vnd.h" -#include "vndCos.h" +#include "cos.h" #include "vnode.h" #include "vnodeInt.h" #include "audit.h" diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4cd8319a07..806124bc58 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -137,6 +137,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +//#define CHECKPOINT_PATH_LEN 128 +//typedef struct SChekpointDataHeader{ +// int64_t size; +// char name[CHECKPOINT_PATH_LEN]; +// char id[CHECKPOINT_PATH_LEN]; +//} SChekpointDataHeader; + +int uploadCheckpoint(char* id, SArray* fileList); +int downloadCheckpoint(char* id, char* path); +int deleteCheckpoint(char* id); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2cde368195..238c7c2329 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -14,6 +14,7 @@ */ #include "streamInt.h" +#include "rsync.h" int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -357,3 +358,134 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } + + +//static int64_t kBlockSize = 64 * 1024; +//static int sendCheckpointToS3(char* id, SArray* fileList){ +// code = s3PutObjectFromFile2(from->fname, object_name); +// return 0; +//} +//static int sendCheckpointToSnode(char* id, SArray* fileList){ +// if(strlen(id) >= CHECKPOINT_PATH_LEN){ +// tqError("uploadCheckpoint id name too long, name:%s", id); +// return -1; +// } +// uint8_t* buf = taosMemoryCalloc(1, sizeof(SChekpointDataHeader) + kBlockSize); +// if(buf == NULL){ +// tqError("uploadCheckpoint malloc failed"); +// return -1; +// } +// +// SChekpointDataHeader* pHdr = (SChekpointDataHeader*)buf; +// strcpy(pHdr->id, id); +// +// TdFilePtr fd = NULL; +// for(int i = 0; i < taosArrayGetSize(fileList); i++){ +// char* name = (char*)taosArrayGetP(fileList, i); +// if(strlen(name) >= CHECKPOINT_PATH_LEN){ +// tqError("uploadCheckpoint file name too long, name:%s", name); +// return -1; +// } +// int64_t offset = 0; +// +// fd = taosOpenFile(name, TD_FILE_READ); +// tqDebug("uploadCheckpoint open file %s, file index: %d", name, i); +// +// while(1){ +// int64_t nread = taosPReadFile(fd, buf + sizeof(SChekpointDataHeader), kBlockSize, offset); +// if (nread == -1) { +// taosCloseFile(&fd); +// taosMemoryFree(buf); +// tqError("uploadCheckpoint failed to read file name:%s,reason:%d", name, errno); +// return -1; +// } else if (nread == 0){ +// tqDebug("uploadCheckpoint no data read, close file:%s, move to next file, open and read", name); +// taosCloseFile(&fd); +// break; +// } else if (nread == kBlockSize){ +// offset += nread; +// } else { +// taosCloseFile(&fd); +// offset = 0; +// } +// tqDebug("uploadCheckpoint read file %s, size:%" PRId64 ", current offset:%" PRId64, name, nread, offset); +// +// +// pHdr->size = nread; +// strcpy(pHdr->name, name); +// +// SRpcMsg rpcMsg = {0}; +// int32_t bytes = sizeof(SChekpointDataHeader) + nread; +// rpcMsg.pCont = rpcMallocCont(bytes); +// rpcMsg.msgType = TDMT_SYNC_SNAPSHOT_SEND; +// rpcMsg.contLen = bytes; +// if (rpcMsg.pCont == NULL) { +// tqError("uploadCheckpoint malloc failed"); +// taosCloseFile(&fd); +// taosMemoryFree(buf); +// return -1; +// } +// memcpy(rpcMsg.pCont, buf, bytes); +// int try = 3; +// int32_t code = 0; +// while(try-- > 0){ +// code = tmsgSendReq(pEpSet, &rpcMsg); +// if(code == 0) +// break; +// taosMsleep(10); +// } +// if(code != 0){ +// tqError("uploadCheckpoint send request failed code:%d", code); +// taosCloseFile(&fd); +// taosMemoryFree(buf); +// return -1; +// } +// +// if(offset == 0){ +// break; +// } +// } +// } +// +// taosMemoryFree(buf); + +//} + +int uploadCheckpoint(char* id, SArray* fileList){ + if(id == NULL || fileList == NULL || strlen(id) == 0 || taosArrayGetSize(fileList) == 0){ + stError("uploadCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeIp) != 0){ + uploadRsync(id, fileList); +// }else if(tsS3StreamEnabled){ + + } + return 0; +} + +int downloadCheckpoint(char* id, char* path){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){ + stError("downloadCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeIp) != 0){ + downloadRsync(id, path); +// }else if(tsS3StreamEnabled){ + + } + return 0; +} + +int deleteCheckpoint(char* id){ + if(id == NULL || strlen(id) == 0){ + stError("deleteCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeIp) != 0){ + deleteRsync(id); +// }else if(tsS3StreamEnabled){ + + } + return 0; +} diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 629b04ae51..d756b71e29 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -18,6 +18,17 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" ) +ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) +TARGET_LINK_LIBRARIES( + checkpointTest + PUBLIC os common gtest stream executor qcom index transport util +) + +TARGET_INCLUDE_DIRECTORIES( + checkpointTest + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +) + add_test( NAME streamUpdateTest COMMAND streamUpdateTest diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp new file mode 100644 index 0000000000..56614cc537 --- /dev/null +++ b/source/libs/stream/test/checkpointTest.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "rsync.h" +#include "streamInt.h" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + strcpy(tsSnodeIp, "127.0.0.1"); + return RUN_ALL_TESTS(); +} + +TEST(testCase, checkpointUpload_Test) { + stopRsync(); + startRsync(); + + taosSsleep(5); + SArray* fileList = taosArrayInit(0, POINTER_BYTES); + char* id = "2013892036"; + char* file1 = "/Users/mingmingwanng/wal1"; + char* file2 = "/Users/mingmingwanng/java_error_in_clion.hprof"; + taosArrayPush(fileList, &file1); + taosArrayPush(fileList, &file2); + + uploadCheckpoint(id, fileList); +} + +TEST(testCase, checkpointDownload_Test) { + char* id = "2013892036"; + downloadRsync(id, "/Users/mingmingwanng/rsync/tmp"); +} + +TEST(testCase, checkpointDelete_Test) { + char* id = "2013892036"; + deleteRsync(id); +} diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/util/src/cos.c similarity index 99% rename from source/dnode/vnode/src/vnd/vnodeCos.c rename to source/util/src/cos.c index 6e36739f5a..23b2d53990 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/util/src/cos.c @@ -1,6 +1,6 @@ #define ALLOW_FORBID_FUNC -#include "vndCos.h" +#include "cos.h" extern char tsS3Endpoint[]; extern char tsS3AccessKeyId[]; diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c new file mode 100644 index 0000000000..940c631c65 --- /dev/null +++ b/source/util/src/rsync.c @@ -0,0 +1,178 @@ +// +// Created by mingming wanng on 2023/11/2. +// +#include "rsync.h" +#include +#include "tglobal.h" + +#define ERRNO_ERR_FORMAT "errno:%d,msg:%s" +#define ERRNO_ERR_DATA errno,strerror(errno) + +// deleteRsync function produce empty directories, traverse base directory to remove them +static void removeEmptyDir(){ + TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir); + if (pDir == NULL) return; + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (!taosDirEntryIsDir(de)) { + continue; + } + + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de)); + + TdDirPtr pDirTmp = taosOpenDir(filename); + TdDirEntryPtr deTmp = NULL; + bool empty = true; + while ((deTmp = taosReadDir(pDirTmp)) != NULL){ + if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue; + empty = false; + } + if(empty) taosRemoveDir(filename); + taosCloseDir(&pDirTmp); + } + + taosCloseDir(&pDir); +} + +static int generateConfigFile(char* confDir){ + TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA); + return -1; + } + + char confContent[PATH_MAX*4] = {0}; + snprintf(confContent, PATH_MAX*4, + "uid = root\n" + "gid = root\n" + "use chroot = false\n" + "max connections = 200\n" + "timeout = 100\n" + "lock file = %srsync.lock\n" + "log file = %srsync.log\n" + "ignore errors = true\n" + "read only = false\n" + "list = false\n" + "[checkpoint]\n" + "path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir, tsCheckpointBackupDir); + uDebug("[rsync] conf:%s", confContent); + if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0){ + uError("[rsync] write conf file error,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); + taosCloseFile(&pFile); + return -1; + } + + taosCloseFile(&pFile); + return 0; +} + +static int execCommand(char* command){ + int try = 3; + int32_t code = 0; + while(try-- > 0) { + code = system(command); + if (code == 0) { + break; + } + taosMsleep(10); + } + return code; +} + +void stopRsync(){ + int code = system("pkill rsync"); + if(code != 0){ + uError("[rsync] stop rsync server failed,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); + return; + } + uDebug("[rsync] stop rsync server successful"); +} + +void startRsync(){ + if(taosMulMkDir(tsCheckpointBackupDir) != 0){ + uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA); + return; + } + removeEmptyDir(); + + char confDir[PATH_MAX] = {0}; + snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir); + + int code = generateConfigFile(confDir); + if(code != 0){ + return; + } + + char cmd[PATH_MAX] = {0}; + snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir); + // start rsync service to backup checkpoint + code = system(cmd); + if(code != 0){ + uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return; + } + uDebug("[rsync] start server successful"); +} + +int uploadRsync(char* id, SArray* fileList){ + for(int i = 0; i < taosArrayGetSize(fileList); i++) { + char* fullName = (char*)taosArrayGetP(fileList, i); + char command[PATH_MAX] = {0}; +// char* name = strrchr(fullName, '/'); +// if(name == NULL){ +// uError("[rsync] file name invalid, name:%s", name); +// return -1; +// } +// name = name + 1; + snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", + fullName, tsSnodeIp, id); + + int code = execCommand(command); + if(code != 0){ + uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + } + uDebug("[rsync] upload data:%s successful", id); + return 0; +} + +int downloadRsync(char* id, char* path){ + char command[PATH_MAX] = {0}; + snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + tsSnodeIp, id, path); + + int code = execCommand(command); + if(code != 0){ + uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] down data:%s successful", id); + return 0; +} + +int deleteRsync(char* id){ + char* tmp = "./tmp_empty/"; + int code = taosMkDir(tmp); + if(code != 0){ + uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + char command[PATH_MAX] = {0}; + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", + tmp, tsSnodeIp, id); + + code = execCommand(command); + taosRemoveDir(tmp); + if(code != 0){ + uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] delete data:%s successful", id); + + return 0; +} \ No newline at end of file From 4698e9270fe2c92a1f516fb8aabfda488f2c954b Mon Sep 17 00:00:00 2001 From: wangjiaming Date: Fri, 3 Nov 2023 10:26:13 +0800 Subject: [PATCH 16/25] Update 10-function.md --- docs/en/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 5be14093de..d1b4aedf50 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -540,7 +540,7 @@ TO_CHAR(ts, format_str_literal) - If we want to output some characters of format without converting, surround it with double quotes. `to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. If want to output double quotes, add a back slash before double quote, like `to_char(ts, '\"yyyy-mm-dd\"')` will output `"2023-10-10"`. - For formats that output digits, the uppercase and lowercase formats are the same. - It's recommended to put time zone in the format, if not, the default time zone will be that in server or client. -- The precision of the input timestamp will be recognized automatically according to the precision of the table used. +- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milli seconds will be used if no table is specified. #### TO_TIMESTAMP From 603bfe1a48df6dd2539d9d2c4a4b66afed5dd7f5 Mon Sep 17 00:00:00 2001 From: wangjiaming Date: Fri, 3 Nov 2023 10:27:12 +0800 Subject: [PATCH 17/25] Update 10-function.md --- docs/zh/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 723f299cbc..c1dc6a6363 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -540,7 +540,7 @@ TO_CHAR(ts, format_str_literal) - 时间格式中无法匹配规则的内容会直接输出. 如果想要在格式串中指定某些能够匹配规则的部分不做转换, 可以使用双引号, 如`to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. 如果想要输出双引号, 那么在双引号之前加一个反斜杠, 如 `to_char(ts, '\"yyyy-mm-dd\"')` 将会输出 `"2023-10-10"`. - 那些输出是数字的格式, 如`YYYY`, `DD`, 大写与小写意义相同, 即`yyyy` 和 `YYYY` 可以互换. - 推荐在时间格式中带时区信息,如果不带则默认输出的时区为服务端或客户端所配置的时区. -- 输入时间戳的精度由所查询表的精度确定. +- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒. #### TO_TIMESTAMP From 153bd80a4e744ecba5fabae58abb5297ebf4b1d4 Mon Sep 17 00:00:00 2001 From: dapan1121 <72057773+dapan1121@users.noreply.github.com> Date: Fri, 3 Nov 2023 10:27:18 +0800 Subject: [PATCH 18/25] Update 10-function.md --- docs/en/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index d1b4aedf50..2ea144c56a 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -540,7 +540,7 @@ TO_CHAR(ts, format_str_literal) - If we want to output some characters of format without converting, surround it with double quotes. `to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. If want to output double quotes, add a back slash before double quote, like `to_char(ts, '\"yyyy-mm-dd\"')` will output `"2023-10-10"`. - For formats that output digits, the uppercase and lowercase formats are the same. - It's recommended to put time zone in the format, if not, the default time zone will be that in server or client. -- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milli seconds will be used if no table is specified. +- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified. #### TO_TIMESTAMP From 3ac11eba0eceeae8e2a9785c806c8e66aa9f472f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 11:33:18 +0800 Subject: [PATCH 19/25] fix:modify upload checkpoint interface --- include/util/rsync.h | 2 +- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckpoint.c | 6 +++--- source/libs/stream/test/checkpointTest.cpp | 7 +------ source/util/src/rsync.c | 24 ++++++++++++---------- 5 files changed, 19 insertions(+), 22 deletions(-) diff --git a/include/util/rsync.h b/include/util/rsync.h index 50b27b95e0..6cce645d1e 100644 --- a/include/util/rsync.h +++ b/include/util/rsync.h @@ -13,7 +13,7 @@ extern "C" { void stopRsync(); void startRsync(); -int uploadRsync(char* id, SArray* fileList); +int uploadRsync(char* id, char* path); int downloadRsync(char* id, char* path); int deleteRsync(char* id); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 806124bc58..85e4e923e5 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -144,7 +144,7 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); // char id[CHECKPOINT_PATH_LEN]; //} SChekpointDataHeader; -int uploadCheckpoint(char* id, SArray* fileList); +int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 238c7c2329..e02a591cff 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -451,13 +451,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { //} -int uploadCheckpoint(char* id, SArray* fileList){ - if(id == NULL || fileList == NULL || strlen(id) == 0 || taosArrayGetSize(fileList) == 0){ +int uploadCheckpoint(char* id, char* path){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){ stError("uploadCheckpoint parameters invalid"); return -1; } if(strlen(tsSnodeIp) != 0){ - uploadRsync(id, fileList); + uploadRsync(id, path); // }else if(tsS3StreamEnabled){ } diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 56614cc537..6b53f13c71 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -40,14 +40,9 @@ TEST(testCase, checkpointUpload_Test) { startRsync(); taosSsleep(5); - SArray* fileList = taosArrayInit(0, POINTER_BYTES); char* id = "2013892036"; - char* file1 = "/Users/mingmingwanng/wal1"; - char* file2 = "/Users/mingmingwanng/java_error_in_clion.hprof"; - taosArrayPush(fileList, &file1); - taosArrayPush(fileList, &file2); - uploadCheckpoint(id, fileList); + uploadCheckpoint(id, "/Users/mingmingwanng/rsync/"); } TEST(testCase, checkpointDownload_Test) { diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c index 940c631c65..b31526dc48 100644 --- a/source/util/src/rsync.c +++ b/source/util/src/rsync.c @@ -118,24 +118,26 @@ void startRsync(){ uDebug("[rsync] start server successful"); } -int uploadRsync(char* id, SArray* fileList){ - for(int i = 0; i < taosArrayGetSize(fileList); i++) { - char* fullName = (char*)taosArrayGetP(fileList, i); - char command[PATH_MAX] = {0}; +int uploadRsync(char* id, char* path){ + char command[PATH_MAX] = {0}; // char* name = strrchr(fullName, '/'); // if(name == NULL){ // uError("[rsync] file name invalid, name:%s", name); // return -1; // } // name = name + 1; - snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", - fullName, tsSnodeIp, id); + if(path[strlen(path) - 1] != '/'){ + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", + path, tsSnodeIp, id); + }else{ + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", + path, tsSnodeIp, id); + } - int code = execCommand(command); - if(code != 0){ - uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); - return -1; - } + int code = execCommand(command); + if(code != 0){ + uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; } uDebug("[rsync] upload data:%s successful", id); return 0; From 5bd0745a762682e2ebbed243a9e837e2f836654b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 11:46:21 +0800 Subject: [PATCH 20/25] fix:add config for checkpoint --- source/common/src/tglobal.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0eebb9ff22..39410dc8ae 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -657,6 +657,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddString(pCfg, "snodeIp", tsSnodeIp, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; @@ -1081,6 +1084,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); + tstrncpy(tsSnodeIp, cfgGetItem(pCfg, "snodeIp")->str, TSDB_FQDN_LEN); + tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; From 7a39b3296157439d66239e4d48ca1eb96635c4ea Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 16:33:48 +0800 Subject: [PATCH 21/25] fix:add windows support --- source/common/src/tglobal.c | 4 ++++ source/util/src/rsync.c | 15 ++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 39410dc8ae..639e300179 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -135,7 +135,11 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // checkpoint backup char tsSnodeIp[TSDB_FQDN_LEN] = {0}; +#ifdef WINDOWS +char tsCheckpointBackupDir[PATH_MAX] = "/c/TDengine/data/backup/checkpoint/"; +#else char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; +#endif // tmq int32_t tmqMaxTopicNum = 20; diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c index b31526dc48..e3908e53ba 100644 --- a/source/util/src/rsync.c +++ b/source/util/src/rsync.c @@ -47,8 +47,10 @@ static int generateConfigFile(char* confDir){ char confContent[PATH_MAX*4] = {0}; snprintf(confContent, PATH_MAX*4, +#ifndef WINDOWS "uid = root\n" "gid = root\n" +#endif "use chroot = false\n" "max connections = 200\n" "timeout = 100\n" @@ -84,7 +86,12 @@ static int execCommand(char* command){ } void stopRsync(){ - int code = system("pkill rsync"); + int code = +#ifdef WINDOWS + system("taskkill /f /mi rsync.exe"); +#else + system("pkill rsync"); +#endif if(code != 0){ uError("[rsync] stop rsync server failed,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); return; @@ -120,12 +127,6 @@ void startRsync(){ int uploadRsync(char* id, char* path){ char command[PATH_MAX] = {0}; -// char* name = strrchr(fullName, '/'); -// if(name == NULL){ -// uError("[rsync] file name invalid, name:%s", name); -// return -1; -// } -// name = name + 1; if(path[strlen(path) - 1] != '/'){ snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", path, tsSnodeIp, id); From 572dd8ec3953ea1e878f689b2759c8fc9642ff94 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 17:14:04 +0800 Subject: [PATCH 22/25] fix:add windows support --- source/common/src/tglobal.c | 2 +- source/util/src/rsync.c | 30 ++++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 639e300179..51c141c754 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -136,7 +136,7 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // checkpoint backup char tsSnodeIp[TSDB_FQDN_LEN] = {0}; #ifdef WINDOWS -char tsCheckpointBackupDir[PATH_MAX] = "/c/TDengine/data/backup/checkpoint/"; +char tsCheckpointBackupDir[PATH_MAX] = "C:\TDengine\data\backup\checkpoint\"; #else char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; #endif diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c index e3908e53ba..6316be7f79 100644 --- a/source/util/src/rsync.c +++ b/source/util/src/rsync.c @@ -38,6 +38,21 @@ static void removeEmptyDir(){ taosCloseDir(&pDir); } +#ifdef WINDOWS +// C:\TDengine\data\backup\checkpoint\ -> /c/TDengine/data/backup/checkpoint/ +static void changeDirFromWindowsToLinux(char* from, char* to){ + to[0] = '/'; + to[1] = from[0]; + for(int i = 2; i < strlen(from); i++) { + if (from[i] == '\\') { + to[i] = '/'; + } else { + to[i] = from[i]; + } + } +} +#endif + static int generateConfigFile(char* confDir){ TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -45,6 +60,11 @@ static int generateConfigFile(char* confDir){ return -1; } +#ifdef WINDOWS + char path[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(tsCheckpointBackupDir, path); +#endif + char confContent[PATH_MAX*4] = {0}; snprintf(confContent, PATH_MAX*4, #ifndef WINDOWS @@ -60,7 +80,13 @@ static int generateConfigFile(char* confDir){ "read only = false\n" "list = false\n" "[checkpoint]\n" - "path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir, tsCheckpointBackupDir); + "path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir, +#ifdef WINDOWS + path +#else + tsCheckpointBackupDir +#endif + ); uDebug("[rsync] conf:%s", confContent); if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0){ uError("[rsync] write conf file error,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); @@ -88,7 +114,7 @@ static int execCommand(char* command){ void stopRsync(){ int code = #ifdef WINDOWS - system("taskkill /f /mi rsync.exe"); + system("taskkill /f /im rsync.exe"); #else system("pkill rsync"); #endif From 16f8c1f6280aa49e804a899ede70dae092cdd039 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 17:17:43 +0800 Subject: [PATCH 23/25] fix:add windows support --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 51c141c754..3593665de9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -136,7 +136,7 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // checkpoint backup char tsSnodeIp[TSDB_FQDN_LEN] = {0}; #ifdef WINDOWS -char tsCheckpointBackupDir[PATH_MAX] = "C:\TDengine\data\backup\checkpoint\"; +char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\"; #else char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; #endif From 5a6c3f2b048de3eaeb415be26b8f7e55aecf3fae Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 17:45:37 +0800 Subject: [PATCH 24/25] fix:add windows support --- source/libs/stream/src/streamCheckpoint.c | 4 +-- source/util/src/rsync.c | 30 ++++++++++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b27529d5b8..b30a3bdc50 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -467,7 +467,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { //} int uploadCheckpoint(char* id, char* path){ - if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ stError("uploadCheckpoint parameters invalid"); return -1; } @@ -480,7 +480,7 @@ int uploadCheckpoint(char* id, char* path){ } int downloadCheckpoint(char* id, char* path){ - if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ stError("downloadCheckpoint parameters invalid"); return -1; } diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c index 6316be7f79..8c6bce66a0 100644 --- a/source/util/src/rsync.c +++ b/source/util/src/rsync.c @@ -152,13 +152,27 @@ void startRsync(){ } int uploadRsync(char* id, char* path){ +#ifdef WINDOWS + char pathTransform[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(path, pathTransform); +#endif char command[PATH_MAX] = {0}; if(path[strlen(path) - 1] != '/'){ snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", - path, tsSnodeIp, id); +#ifdef WINDOWS + pathTransform +#else + path +#endif + , tsSnodeIp, id); }else{ snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", - path, tsSnodeIp, id); +#ifdef WINDOWS + pathTransform +#else + path +#endif + , tsSnodeIp, id); } int code = execCommand(command); @@ -171,9 +185,19 @@ int uploadRsync(char* id, char* path){ } int downloadRsync(char* id, char* path){ +#ifdef WINDOWS + char pathTransform[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(path, pathTransform); +#endif char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", - tsSnodeIp, id, path); + tsSnodeIp, id, +#ifdef WINDOWS + pathTransform +#else + path +#endif + ); int code = execCommand(command); if(code != 0){ From b1515dd069cebda55cb3b51b7e88c1d7722ef226 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 17:56:34 +0800 Subject: [PATCH 25/25] fix:add windows support --- source/util/src/rsync.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c index 8c6bce66a0..cbb447a33e 100644 --- a/source/util/src/rsync.c +++ b/source/util/src/rsync.c @@ -157,7 +157,11 @@ int uploadRsync(char* id, char* path){ changeDirFromWindowsToLinux(path, pathTransform); #endif char command[PATH_MAX] = {0}; +#ifdef WINDOWS + if(pathTransform[strlen(pathTransform) - 1] != '/'){ +#else if(path[strlen(path) - 1] != '/'){ +#endif snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", #ifdef WINDOWS pathTransform