From 03aea454d7838018576225bffd37e5d358726671 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 15 Sep 2023 14:33:35 +0800 Subject: [PATCH] chore: more code --- source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 3 +++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 14 ++++++++++++++ source/dnode/mnode/impl/inc/mndGrant.h | 2 ++ source/dnode/mnode/impl/src/mndDnode.c | 10 +++++----- source/dnode/mnode/impl/src/mndMain.c | 11 +++++++---- source/dnode/vnode/inc/vnode.h | 1 - source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/meta/metaQuery.c | 7 ++----- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- 9 files changed, 35 insertions(+), 17 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 09783a5ea9..ab430fa60c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -26,6 +26,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if (dmStartCrashReportThread(pMgmt) != 0) { return -1; } + if (dmStartNotifyThread(pMgmt) != 0) { + return -1; + } return 0; } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 5c08bc4dbc..08816749bf 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -134,6 +134,20 @@ static void *dmCrashReportThreadFp(void *param) { } +int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) { + dError("failed to create status thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-status", "initialized"); + return 0; +} + int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); diff --git a/source/dnode/mnode/impl/inc/mndGrant.h b/source/dnode/mnode/impl/inc/mndGrant.h index 88f118cb8f..a036bf6d7e 100644 --- a/source/dnode/mnode/impl/inc/mndGrant.h +++ b/source/dnode/mnode/impl/inc/mndGrant.h @@ -22,6 +22,8 @@ #include "mndInt.h" +#define GRANT_HB_INTERVAL 300 // 300 seconds + int32_t mndInitGrant(SMnode * pMnode); void mndCleanupGrant(); void grantParseParameter(); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 7997b31e76..3e8d64b2a5 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -51,7 +51,7 @@ enum { DND_CONN_ACTIVE_CODE, }; -extern int32_t mndUpdateClusterInfo(SRpcMsg *pReq); +int32_t mndUpdateClusterInfo(SRpcMsg *pReq); static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); @@ -472,6 +472,10 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) { return stateChanged; } +#ifndef TD_ENTERPRISE +int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; } +#endif + static int32_t mndProcessStatusReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStatusReq statusReq = {0}; @@ -671,10 +675,6 @@ _OVER: return code; } -#ifndef TD_ENTERPRISE -int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; } -#endif - static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) { int32_t code = -1; SSdbRaw *pRaw = NULL; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 1c87cde78a..e00af5375d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -262,6 +262,13 @@ static void *mndThreadFp(void *param) { lastTime++; taosMsleep(100); if (mndGetStop(pMnode)) break; + + if (atomic_val_compare_exchange_32(&tsGrantHBInterval, -GRANT_HB_INTERVAL, GRANT_HB_INTERVAL) == + -GRANT_HB_INTERVAL || + (lastTime % (tsGrantHBInterval * 10) == 0)) { + mndPullupGrant(pMnode); + } + if (lastTime % 10 != 0) continue; int64_t sec = lastTime / 10; @@ -293,10 +300,6 @@ static void *mndThreadFp(void *param) { mndPullupTelem(pMnode); } - if (sec % tsGrantHBInterval == 0) { - mndPullupGrant(pMnode); - } - if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e15f5f911d..5ae257aef8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -288,7 +288,6 @@ typedef struct { int64_t numOfSTables; int64_t numOfCTables; int64_t numOfNTables; - int64_t numOfCmprTables; int64_t numOfNTimeSeries; int64_t numOfTimeSeries; int64_t itvTimeSeries; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 536273c044..6eda33424f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -168,7 +168,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid); int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); -int64_t metaGetTimeSeriesNum(SMeta* pMeta); +int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type); SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock); int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first); void metaPauseCtbCursor(SMCtbCursor* pCtbCur); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 903f7d4cce..c7b7ce9686 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -697,18 +697,15 @@ int64_t metaGetTbNum(SMeta *pMeta) { } // N.B. Called by statusReq per second -int64_t metaGetTimeSeriesNum(SMeta *pMeta) { +int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) { // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) - int64_t nTables = metaGetTbNum(pMeta); - if (nTables - pMeta->pVnode->config.vndStats.numOfCmprTables > 0 || - pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || + if (type || pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || ++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) { int64_t num = 0; vnodeGetTimeSeriesNum(pMeta->pVnode, &num); pMeta->pVnode->config.vndStats.numOfTimeSeries = num; pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2; - pMeta->pVnode->config.vndStats.numOfCmprTables = nTables; } return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 01dd062866..16a21ca405 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -388,7 +388,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); - pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); + pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 0); pLoad->totalStorage = (int64_t)3 * 1073741824; pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->pointsWritten = 100;