From 55ec5f164cae5f127c260e42c7677b64ac72402c Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 26 Apr 2024 15:09:53 +0800 Subject: [PATCH 1/9] enh: batch create table --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 5 +++++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 7 ++++++- .../0-others/test_hot_refresh_configurations.py | 6 ++++++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 3b8929f241..6c8448a6be 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -178,6 +178,7 @@ extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; extern int32_t tsSlowLogScope; extern int32_t tsTimeSeriesThreshold; +extern int64_t tsTimeSeriesInterval; extern bool tsMultiResultFunctionStarReturnTags; // client diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d34e23c0ba..9a5392ca1f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -181,6 +181,7 @@ int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; int32_t tsTimeSeriesThreshold = 50; +int64_t tsTimeSeriesInterval = 10; // ms bool tsMultiResultFunctionStarReturnTags = false; /* @@ -783,6 +784,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "timeseriesInterval", tsTimeSeriesInterval, 1, 100, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + return -1; if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) @@ -1239,6 +1242,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32; + tsTimeSeriesInterval = cfgGetItem(pCfg, "timeseriesInterval")->i64; tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; @@ -1557,6 +1561,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"numOfLogLines", &tsNumOfLogLines}, {"queryRspPolicy", &tsQueryRspPolicy}, {"timeseriesThreshold", &tsTimeSeriesThreshold}, + {"timeseriesInterval", &tsTimeSeriesInterval}, {"tmqMaxTopicNum", &tmqMaxTopicNum}, {"tmqRowSize", &tmqRowSize}, {"transPullupInterval", &tsTransPullupInterval}, diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index d124eb74be..934d4595ad 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -56,12 +56,17 @@ static void *dmNotifyThreadFp(void *param) { return NULL; } - bool wait = true; + bool wait = true; + int64_t lastNotify = 0; while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); + if (taosGetTimestampMs() - lastNotify < tsTimeSeriesInterval) { + taosMsleep(tsTimeSeriesInterval); + } dmSendNotifyReq(pMgmt); + lastNotify = taosGetTimestampMs(); if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { wait = true; continue; diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index 71f6290469..759d6074f5 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -96,6 +96,12 @@ class TDTestCase: "values": [0, 200, 2000], "except_values": [-2, 2001] }, + { + "name": "timeseriesInterval", + "alias": "tsTimeSeriesInterval", + "values": [1, 10, 100], + "except_values": [-2, 2001] + }, { "name": "minDiskFreeSize", "alias": "tsMinDiskFreeSize", From 00953afd780ab7f66fa95c01fc7810c461a37065 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 26 Apr 2024 15:12:11 +0800 Subject: [PATCH 2/9] enh: batch create table --- tests/system-test/0-others/test_hot_refresh_configurations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index 759d6074f5..3972ecb56b 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -100,7 +100,7 @@ class TDTestCase: "name": "timeseriesInterval", "alias": "tsTimeSeriesInterval", "values": [1, 10, 100], - "except_values": [-2, 2001] + "except_values": [-1, 0, 101] }, { "name": "minDiskFreeSize", From 504263315d5edd2e1dc18768f9d95d65d1f3ef20 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 04:37:30 +0800 Subject: [PATCH 3/9] enh: batch create table --- include/common/tglobal.h | 1 - include/common/tgrant.h | 11 +-- source/common/src/tglobal.c | 5 -- source/common/src/tmsg.c | 9 ++- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 2 +- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 20 ++---- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 72 +++++++++++++++++-- source/dnode/mgmt/node_util/inc/dmUtil.h | 1 + source/dnode/mgmt/node_util/src/dmEps.c | 8 +++ source/dnode/mnode/impl/src/mndGrant.c | 1 + .../test_hot_refresh_configurations.py | 6 -- 11 files changed, 91 insertions(+), 45 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6c8448a6be..3b8929f241 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -178,7 +178,6 @@ extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; extern int32_t tsSlowLogScope; extern int32_t tsTimeSeriesThreshold; -extern int64_t tsTimeSeriesInterval; extern bool tsMultiResultFunctionStarReturnTags; // client diff --git a/include/common/tgrant.h b/include/common/tgrant.h index c1e37787c2..f7759177da 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -58,11 +58,12 @@ typedef enum { TSDB_GRANT_BACKUP_RESTORE, } EGrantType; -int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); -int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); -int32_t grantCheck(EGrantType grant); -int32_t grantCheckExpire(EGrantType grant); -char *tGetMachineId(); +int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); +int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); +uint64_t grantRemain(EGrantType grant); +int32_t grantCheck(EGrantType grant); +int32_t grantCheckExpire(EGrantType grant); +char *tGetMachineId(); // #ifndef GRANTS_CFG #ifdef TD_ENTERPRISE diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1ac762e004..ba96dc0adf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -181,7 +181,6 @@ int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; int32_t tsTimeSeriesThreshold = 50; -int64_t tsTimeSeriesInterval = 10; // ms bool tsMultiResultFunctionStarReturnTags = false; /* @@ -784,8 +783,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "timeseriesInterval", tsTimeSeriesInterval, 1, 100, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) @@ -1242,7 +1239,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32; - tsTimeSeriesInterval = cfgGetItem(pCfg, "timeseriesInterval")->i64; tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; @@ -1561,7 +1557,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"numOfLogLines", &tsNumOfLogLines}, {"queryRspPolicy", &tsQueryRspPolicy}, {"timeseriesThreshold", &tsTimeSeriesThreshold}, - {"timeseriesInterval", &tsTimeSeriesInterval}, {"tmqMaxTopicNum", &tmqMaxTopicNum}, {"tmqRowSize", &tmqRowSize}, {"transPullupInterval", &tsTransPullupInterval}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 45b0b6ac2b..3836f13a2f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1161,16 +1161,15 @@ int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { int32_t nVgroup = 0; if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit; if (nVgroup > 0) { - pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite)); + pReq->pVloads = taosArrayInit_s(sizeof(SVnodeLoadLite), nVgroup); if (!pReq->pVloads) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } for (int32_t i = 0; i < nVgroup; ++i) { - SVnodeLoadLite vload; - if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit; - if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit; - taosArrayPush(pReq->pVloads, &vload); + SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i); + if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit; + if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit; } } diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e1fe69714..46f8dd06d4 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -49,7 +49,7 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); -void dmSendNotifyReq(SDnodeMgmt *pMgmt); +void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 56fdb463c4..91d73c9dd7 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -169,22 +169,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dmProcessStatusRsp(pMgmt, &rpcRsp); } -void dmSendNotifyReq(SDnodeMgmt *pMgmt) { - SNotifyReq req = {0}; - - taosThreadRwlockRdlock(&pMgmt->pData->lock); - req.dnodeId = pMgmt->pData->dnodeId; - taosThreadRwlockUnlock(&pMgmt->pData->lock); - - req.clusterId = pMgmt->pData->clusterId; - - SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsLiteFp)(&vinfo); - req.pVloads = vinfo.pVloads; - - int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req); - void * pHead = rpcMallocCont(contLen); - tSerializeSNotifyReq(pHead, contLen, &req); +void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { + int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq); + void *pHead = rpcMallocCont(contLen); + tSerializeSNotifyReq(pHead, contLen, pReq); tFreeSNotifyReq(&req); SRpcMsg rpcMsg = {.pCont = pHead, diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 934d4595ad..48a0c6b755 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" +#include "tgrant.h" #include "thttp.h" static void *dmStatusThreadFp(void *param) { @@ -47,26 +48,85 @@ static void *dmStatusThreadFp(void *param) { } SDmNotifyHandle dmNotifyHdl = {.state = 0}; - +#define TIMESERIES_STASH_NUM 5 static void *dmNotifyThreadFp(void *param) { SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-notify"); if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { return NULL; } - bool wait = true; - int64_t lastNotify = 0; + // calculate approximate timeSeries per second + int64_t notifyTimeStamp[TIMESERIES_STASH_NUM]; + int64_t notifyTimeSeries[TIMESERIES_STASH_NUM]; + uint64_t nTotalNotify = 0; + int32_t head = -1; + int32_t tail = 0; + + bool wait = true; + int32_t nDnode = 0; + int64_t lastNotify = 0; + SNotifyReq req = {0}; while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); - if (taosGetTimestampMs() - lastNotify < tsTimeSeriesInterval) { - taosMsleep(tsTimeSeriesInterval); + + uint64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES); + if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) { + goto _skip; } - dmSendNotifyReq(pMgmt); + int64_t current = taosGetTimestampMs(); + if (current - lastNotify > 1000) { + nDnode = dmGetDnodeSize(pMgmt->pData); + } + if (req.dnodeId == 0 || req.clusterId == 0) { + req.dnodeId = pMgmt->pData->dnodeId; + req.clusterId = pMgmt->pData->clusterId; + } + + if (current - lastNotify < 10) { + if (remainTimeSeries > 1000000) { + taosMsleep(10); + } else if (remainTimeSeries > 500000) { + taosMsleep(5); + } else { + taosMsleep(2); + } + } + + SMonVloadInfo vinfo = {0}; + (*pMgmt->getVnodeLoadsLiteFp)(&vinfo); + req.pVloads = vinfo.pVloads; + int32_t nVgroup = taosArrayGetSize(req.pVloads); + int64_t nTimeSeries = 0; + for (int32_t i = 0; i < nVgroup; ++i) { + SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i); + nTimeSeries += vload->nTimeSeries; + } + notifyTimeSeries[tail] = nTimeSeries; + notifyTimeStamp[tail] = taosGetTimestampNs(); + ++nTotalNotify; + + uint64_t approximateTimeSeries = 0; + if (nTotalNotify >= TIMESERIES_STASH_NUM) { + int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head]; + int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head]; + if (timeDiff > 0 && timeDiff < 1e9 && tsDiff > 0) { + approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff; + } + } + if (++head == TIMESERIES_STASH_NUM) head = 0; + if (++tail == TIMESERIES_STASH_NUM) tail = 0; + + if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + dmSendNotifyReq(pMgmt, &req); + } + tFreeSNotifyReq(&req); lastNotify = taosGetTimestampMs(); + _skip: if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { wait = true; continue; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0344ca685d..9fdd5e50ed 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -182,6 +182,7 @@ TdFilePtr dmCheckRunning(const char *dataDir); int32_t dmInitDndInfo(SDnodeData *pData); // dmEps.c +int32_t dmGetDnodeSize(SDnodeData *pData); int32_t dmReadEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 4b41b17cb1..eccd556eea 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -355,6 +355,14 @@ _OVER: return code; } +int32_t dmGetDnodeSize(SDnodeData *pData) { + int32_t size = 0; + taosThreadRwlockWrlock(&pData->lock); + size = taosArrayGetSize(pData->dnodeEps); + taosThreadRwlockUnlock(&pData->lock); + return size; +} + void dmUpdateEps(SDnodeData *pData, SArray *eps) { taosThreadRwlockWrlock(&pData->lock); dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer); diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index cce386785a..8dae4b3c11 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -77,6 +77,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); } void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {} void grantAdd(EGrantType grant, uint64_t value) {} void grantRestore(EGrantType grant, uint64_t value) {} +int64_t grantRemain(EGrantType grant) { return 0; } char *tGetMachineId() { return NULL; }; int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index 3972ecb56b..71f6290469 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -96,12 +96,6 @@ class TDTestCase: "values": [0, 200, 2000], "except_values": [-2, 2001] }, - { - "name": "timeseriesInterval", - "alias": "tsTimeSeriesInterval", - "values": [1, 10, 100], - "except_values": [-1, 0, 101] - }, { "name": "minDiskFreeSize", "alias": "tsMinDiskFreeSize", From 3b504e6e968e0ff7a9ea05a7bd2ff0cf246acefe Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 04:47:00 +0800 Subject: [PATCH 4/9] enh: batch create table --- include/common/tgrant.h | 12 ++++++------ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index f7759177da..5a2ed58045 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -58,12 +58,12 @@ typedef enum { TSDB_GRANT_BACKUP_RESTORE, } EGrantType; -int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); -int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); -uint64_t grantRemain(EGrantType grant); -int32_t grantCheck(EGrantType grant); -int32_t grantCheckExpire(EGrantType grant); -char *tGetMachineId(); +int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); +int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); +int64_t grantRemain(EGrantType grant); +int32_t grantCheck(EGrantType grant); +int32_t grantCheckExpire(EGrantType grant); +char *tGetMachineId(); // #ifndef GRANTS_CFG #ifdef TD_ENTERPRISE diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 48a0c6b755..5ee4434b07 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -74,7 +74,7 @@ static void *dmNotifyThreadFp(void *param) { if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); - uint64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES); + int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES); if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) { goto _skip; } @@ -110,7 +110,7 @@ static void *dmNotifyThreadFp(void *param) { notifyTimeStamp[tail] = taosGetTimestampNs(); ++nTotalNotify; - uint64_t approximateTimeSeries = 0; + int64_t approximateTimeSeries = 0; if (nTotalNotify >= TIMESERIES_STASH_NUM) { int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head]; int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head]; From 53e9e0b3aff3a67dc9b3be6cfde52dca57c6cf11 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 04:58:05 +0800 Subject: [PATCH 5/9] enh: batch create table --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 91d73c9dd7..a17db20773 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -173,7 +173,6 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq); void *pHead = rpcMallocCont(contLen); tSerializeSNotifyReq(pHead, contLen, pReq); - tFreeSNotifyReq(&req); SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, From e392b393237b5c3016982c88d423febff1539698 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 10:24:16 +0800 Subject: [PATCH 6/9] enh: batch create table --- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 26 +++++++++++++-------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 5ee4434b07..64588c6790 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -61,9 +61,9 @@ static void *dmNotifyThreadFp(void *param) { // calculate approximate timeSeries per second int64_t notifyTimeStamp[TIMESERIES_STASH_NUM]; int64_t notifyTimeSeries[TIMESERIES_STASH_NUM]; + int64_t approximateTimeSeries = 0; uint64_t nTotalNotify = 0; - int32_t head = -1; - int32_t tail = 0; + int32_t head, tail = 0; bool wait = true; int32_t nDnode = 0; @@ -81,6 +81,7 @@ static void *dmNotifyThreadFp(void *param) { int64_t current = taosGetTimestampMs(); if (current - lastNotify > 1000) { nDnode = dmGetDnodeSize(pMgmt->pData); + if (nDnode < 1) nDnode = 1; } if (req.dnodeId == 0 || req.clusterId == 0) { req.dnodeId = pMgmt->pData->dnodeId; @@ -88,9 +89,11 @@ static void *dmNotifyThreadFp(void *param) { } if (current - lastNotify < 10) { - if (remainTimeSeries > 1000000) { + int64_t nCmprTimeSeries = approximateTimeSeries / 100; + if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5; + if (remainTimeSeries > nCmprTimeSeries * 10) { taosMsleep(10); - } else if (remainTimeSeries > 500000) { + } else if (remainTimeSeries > nCmprTimeSeries * 5) { taosMsleep(5); } else { taosMsleep(2); @@ -110,20 +113,23 @@ static void *dmNotifyThreadFp(void *param) { notifyTimeStamp[tail] = taosGetTimestampNs(); ++nTotalNotify; - int64_t approximateTimeSeries = 0; + approximateTimeSeries = 0; if (nTotalNotify >= TIMESERIES_STASH_NUM) { + head = tail - TIMESERIES_STASH_NUM + 1; + if (head < 0) head += TIMESERIES_STASH_NUM; int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head]; int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head]; if (timeDiff > 0 && timeDiff < 1e9 && tsDiff > 0) { approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff; + if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + dmSendNotifyReq(pMgmt, &req); + } } - } - if (++head == TIMESERIES_STASH_NUM) head = 0; - if (++tail == TIMESERIES_STASH_NUM) tail = 0; - - if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + } else { dmSendNotifyReq(pMgmt, &req); } + if (++tail == TIMESERIES_STASH_NUM) tail = 0; + tFreeSNotifyReq(&req); lastNotify = taosGetTimestampMs(); _skip: From bc1f9bca93539978f751140a7d45f955c9b5f0a7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 11:07:53 +0800 Subject: [PATCH 7/9] enh: batch create table --- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 64588c6790..c9f8801ceb 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -68,6 +68,7 @@ static void *dmNotifyThreadFp(void *param) { bool wait = true; int32_t nDnode = 0; int64_t lastNotify = 0; + int64_t lastFetchDnode = 0; SNotifyReq req = {0}; while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; @@ -79,9 +80,10 @@ static void *dmNotifyThreadFp(void *param) { goto _skip; } int64_t current = taosGetTimestampMs(); - if (current - lastNotify > 1000) { + if (current - lastFetchDnode > 1000) { nDnode = dmGetDnodeSize(pMgmt->pData); if (nDnode < 1) nDnode = 1; + lastFetchDnode = current; } if (req.dnodeId == 0 || req.clusterId == 0) { req.dnodeId = pMgmt->pData->dnodeId; From 4d572ebe88aa6241b4ec425ae273e6168855f777 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 11:25:40 +0800 Subject: [PATCH 8/9] enh: batch create table --- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index c9f8801ceb..c48b614f96 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -121,9 +121,13 @@ static void *dmNotifyThreadFp(void *param) { if (head < 0) head += TIMESERIES_STASH_NUM; int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head]; int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head]; - if (timeDiff > 0 && timeDiff < 1e9 && tsDiff > 0) { - approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff; - if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + if (tsDiff > 0) { + if (timeDiff > 0 && timeDiff < 1e9) { + approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff; + if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + dmSendNotifyReq(pMgmt, &req); + } + } else { dmSendNotifyReq(pMgmt, &req); } } From 4eb1763b52bbc341e0f1e6c4e53fd9a6950ee26d Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 28 Apr 2024 13:52:17 +0800 Subject: [PATCH 9/9] enh: batch create table --- source/dnode/mgmt/node_util/src/dmEps.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index eccd556eea..c585a780ac 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -357,7 +357,7 @@ _OVER: int32_t dmGetDnodeSize(SDnodeData *pData) { int32_t size = 0; - taosThreadRwlockWrlock(&pData->lock); + taosThreadRwlockRdlock(&pData->lock); size = taosArrayGetSize(pData->dnodeEps); taosThreadRwlockUnlock(&pData->lock); return size;