diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6c8448a6be..3b8929f241 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -178,7 +178,6 @@ extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; extern int32_t tsSlowLogScope; extern int32_t tsTimeSeriesThreshold; -extern int64_t tsTimeSeriesInterval; extern bool tsMultiResultFunctionStarReturnTags; // client diff --git a/include/common/tgrant.h b/include/common/tgrant.h index c1e37787c2..f7759177da 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -58,11 +58,12 @@ typedef enum { TSDB_GRANT_BACKUP_RESTORE, } EGrantType; -int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); -int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); -int32_t grantCheck(EGrantType grant); -int32_t grantCheckExpire(EGrantType grant); -char *tGetMachineId(); +int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); +int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); +uint64_t grantRemain(EGrantType grant); +int32_t grantCheck(EGrantType grant); +int32_t grantCheckExpire(EGrantType grant); +char *tGetMachineId(); // #ifndef GRANTS_CFG #ifdef TD_ENTERPRISE diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1ac762e004..ba96dc0adf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -181,7 +181,6 @@ int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; int32_t tsTimeSeriesThreshold = 50; -int64_t tsTimeSeriesInterval = 10; // ms bool tsMultiResultFunctionStarReturnTags = false; /* @@ -784,8 +783,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "timeseriesInterval", tsTimeSeriesInterval, 1, 100, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) - return -1; if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) @@ -1242,7 +1239,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32; - tsTimeSeriesInterval = cfgGetItem(pCfg, "timeseriesInterval")->i64; tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; @@ -1561,7 +1557,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"numOfLogLines", &tsNumOfLogLines}, {"queryRspPolicy", &tsQueryRspPolicy}, {"timeseriesThreshold", &tsTimeSeriesThreshold}, - {"timeseriesInterval", &tsTimeSeriesInterval}, {"tmqMaxTopicNum", &tmqMaxTopicNum}, {"tmqRowSize", &tmqRowSize}, {"transPullupInterval", &tsTransPullupInterval}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 45b0b6ac2b..3836f13a2f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1161,16 +1161,15 @@ int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { int32_t nVgroup = 0; if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit; if (nVgroup > 0) { - pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite)); + pReq->pVloads = taosArrayInit_s(sizeof(SVnodeLoadLite), nVgroup); if (!pReq->pVloads) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } for (int32_t i = 0; i < nVgroup; ++i) { - SVnodeLoadLite vload; - if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit; - if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit; - taosArrayPush(pReq->pVloads, &vload); + SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i); + if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit; + if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit; } } diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e1fe69714..46f8dd06d4 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -49,7 +49,7 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); -void dmSendNotifyReq(SDnodeMgmt *pMgmt); +void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 56fdb463c4..91d73c9dd7 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -169,22 +169,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dmProcessStatusRsp(pMgmt, &rpcRsp); } -void dmSendNotifyReq(SDnodeMgmt *pMgmt) { - SNotifyReq req = {0}; - - taosThreadRwlockRdlock(&pMgmt->pData->lock); - req.dnodeId = pMgmt->pData->dnodeId; - taosThreadRwlockUnlock(&pMgmt->pData->lock); - - req.clusterId = pMgmt->pData->clusterId; - - SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsLiteFp)(&vinfo); - req.pVloads = vinfo.pVloads; - - int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req); - void * pHead = rpcMallocCont(contLen); - tSerializeSNotifyReq(pHead, contLen, &req); +void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { + int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq); + void *pHead = rpcMallocCont(contLen); + tSerializeSNotifyReq(pHead, contLen, pReq); tFreeSNotifyReq(&req); SRpcMsg rpcMsg = {.pCont = pHead, diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 934d4595ad..48a0c6b755 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" +#include "tgrant.h" #include "thttp.h" static void *dmStatusThreadFp(void *param) { @@ -47,26 +48,85 @@ static void *dmStatusThreadFp(void *param) { } SDmNotifyHandle dmNotifyHdl = {.state = 0}; - +#define TIMESERIES_STASH_NUM 5 static void *dmNotifyThreadFp(void *param) { SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-notify"); if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { return NULL; } - bool wait = true; - int64_t lastNotify = 0; + // calculate approximate timeSeries per second + int64_t notifyTimeStamp[TIMESERIES_STASH_NUM]; + int64_t notifyTimeSeries[TIMESERIES_STASH_NUM]; + uint64_t nTotalNotify = 0; + int32_t head = -1; + int32_t tail = 0; + + bool wait = true; + int32_t nDnode = 0; + int64_t lastNotify = 0; + SNotifyReq req = {0}; while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); - if (taosGetTimestampMs() - lastNotify < tsTimeSeriesInterval) { - taosMsleep(tsTimeSeriesInterval); + + uint64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES); + if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) { + goto _skip; } - dmSendNotifyReq(pMgmt); + int64_t current = taosGetTimestampMs(); + if (current - lastNotify > 1000) { + nDnode = dmGetDnodeSize(pMgmt->pData); + } + if (req.dnodeId == 0 || req.clusterId == 0) { + req.dnodeId = pMgmt->pData->dnodeId; + req.clusterId = pMgmt->pData->clusterId; + } + + if (current - lastNotify < 10) { + if (remainTimeSeries > 1000000) { + taosMsleep(10); + } else if (remainTimeSeries > 500000) { + taosMsleep(5); + } else { + taosMsleep(2); + } + } + + SMonVloadInfo vinfo = {0}; + (*pMgmt->getVnodeLoadsLiteFp)(&vinfo); + req.pVloads = vinfo.pVloads; + int32_t nVgroup = taosArrayGetSize(req.pVloads); + int64_t nTimeSeries = 0; + for (int32_t i = 0; i < nVgroup; ++i) { + SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i); + nTimeSeries += vload->nTimeSeries; + } + notifyTimeSeries[tail] = nTimeSeries; + notifyTimeStamp[tail] = taosGetTimestampNs(); + ++nTotalNotify; + + uint64_t approximateTimeSeries = 0; + if (nTotalNotify >= TIMESERIES_STASH_NUM) { + int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head]; + int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head]; + if (timeDiff > 0 && timeDiff < 1e9 && tsDiff > 0) { + approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff; + } + } + if (++head == TIMESERIES_STASH_NUM) head = 0; + if (++tail == TIMESERIES_STASH_NUM) tail = 0; + + if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + dmSendNotifyReq(pMgmt, &req); + } + tFreeSNotifyReq(&req); lastNotify = taosGetTimestampMs(); + _skip: if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { wait = true; continue; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0344ca685d..9fdd5e50ed 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -182,6 +182,7 @@ TdFilePtr dmCheckRunning(const char *dataDir); int32_t dmInitDndInfo(SDnodeData *pData); // dmEps.c +int32_t dmGetDnodeSize(SDnodeData *pData); int32_t dmReadEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 4b41b17cb1..eccd556eea 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -355,6 +355,14 @@ _OVER: return code; } +int32_t dmGetDnodeSize(SDnodeData *pData) { + int32_t size = 0; + taosThreadRwlockWrlock(&pData->lock); + size = taosArrayGetSize(pData->dnodeEps); + taosThreadRwlockUnlock(&pData->lock); + return size; +} + void dmUpdateEps(SDnodeData *pData, SArray *eps) { taosThreadRwlockWrlock(&pData->lock); dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer); diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index cce386785a..8dae4b3c11 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -77,6 +77,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); } void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {} void grantAdd(EGrantType grant, uint64_t value) {} void grantRestore(EGrantType grant, uint64_t value) {} +int64_t grantRemain(EGrantType grant) { return 0; } char *tGetMachineId() { return NULL; }; int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index 3972ecb56b..71f6290469 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -96,12 +96,6 @@ class TDTestCase: "values": [0, 200, 2000], "except_values": [-2, 2001] }, - { - "name": "timeseriesInterval", - "alias": "tsTimeSeriesInterval", - "values": [1, 10, 100], - "except_values": [-1, 0, 101] - }, { "name": "minDiskFreeSize", "alias": "tsMinDiskFreeSize",