chore: more code
This commit is contained in:
parent
7b29d62f04
commit
03aea454d7
|
@ -26,6 +26,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
|
||||||
if (dmStartCrashReportThread(pMgmt) != 0) {
|
if (dmStartCrashReportThread(pMgmt) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (dmStartNotifyThread(pMgmt) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,6 +134,20 @@ static void *dmCrashReportThreadFp(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
|
||||||
|
dError("failed to create status thread since %s", strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
tmsgReportStartup("dnode-status", "initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
|
|
|
@ -22,6 +22,8 @@
|
||||||
|
|
||||||
#include "mndInt.h"
|
#include "mndInt.h"
|
||||||
|
|
||||||
|
#define GRANT_HB_INTERVAL 300 // 300 seconds
|
||||||
|
|
||||||
int32_t mndInitGrant(SMnode * pMnode);
|
int32_t mndInitGrant(SMnode * pMnode);
|
||||||
void mndCleanupGrant();
|
void mndCleanupGrant();
|
||||||
void grantParseParameter();
|
void grantParseParameter();
|
||||||
|
|
|
@ -51,7 +51,7 @@ enum {
|
||||||
DND_CONN_ACTIVE_CODE,
|
DND_CONN_ACTIVE_CODE,
|
||||||
};
|
};
|
||||||
|
|
||||||
extern int32_t mndUpdateClusterInfo(SRpcMsg *pReq);
|
int32_t mndUpdateClusterInfo(SRpcMsg *pReq);
|
||||||
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
|
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
|
||||||
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
||||||
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
||||||
|
@ -472,6 +472,10 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
|
||||||
return stateChanged;
|
return stateChanged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef TD_ENTERPRISE
|
||||||
|
int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; }
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStatusReq statusReq = {0};
|
SStatusReq statusReq = {0};
|
||||||
|
@ -671,10 +675,6 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef TD_ENTERPRISE
|
|
||||||
int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; }
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
|
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SSdbRaw *pRaw = NULL;
|
SSdbRaw *pRaw = NULL;
|
||||||
|
|
|
@ -262,6 +262,13 @@ static void *mndThreadFp(void *param) {
|
||||||
lastTime++;
|
lastTime++;
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
if (mndGetStop(pMnode)) break;
|
if (mndGetStop(pMnode)) break;
|
||||||
|
|
||||||
|
if (atomic_val_compare_exchange_32(&tsGrantHBInterval, -GRANT_HB_INTERVAL, GRANT_HB_INTERVAL) ==
|
||||||
|
-GRANT_HB_INTERVAL ||
|
||||||
|
(lastTime % (tsGrantHBInterval * 10) == 0)) {
|
||||||
|
mndPullupGrant(pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
if (lastTime % 10 != 0) continue;
|
if (lastTime % 10 != 0) continue;
|
||||||
|
|
||||||
int64_t sec = lastTime / 10;
|
int64_t sec = lastTime / 10;
|
||||||
|
@ -293,10 +300,6 @@ static void *mndThreadFp(void *param) {
|
||||||
mndPullupTelem(pMnode);
|
mndPullupTelem(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sec % tsGrantHBInterval == 0) {
|
|
||||||
mndPullupGrant(pMnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sec % tsUptimeInterval == 0) {
|
if (sec % tsUptimeInterval == 0) {
|
||||||
mndIncreaseUpTime(pMnode);
|
mndIncreaseUpTime(pMnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -288,7 +288,6 @@ typedef struct {
|
||||||
int64_t numOfSTables;
|
int64_t numOfSTables;
|
||||||
int64_t numOfCTables;
|
int64_t numOfCTables;
|
||||||
int64_t numOfNTables;
|
int64_t numOfNTables;
|
||||||
int64_t numOfCmprTables;
|
|
||||||
int64_t numOfNTimeSeries;
|
int64_t numOfNTimeSeries;
|
||||||
int64_t numOfTimeSeries;
|
int64_t numOfTimeSeries;
|
||||||
int64_t itvTimeSeries;
|
int64_t itvTimeSeries;
|
||||||
|
|
|
@ -168,7 +168,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
|
||||||
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||||
|
|
||||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
|
||||||
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
||||||
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
|
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
|
||||||
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
|
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
|
||||||
|
|
|
@ -697,18 +697,15 @@ int64_t metaGetTbNum(SMeta *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Called by statusReq per second
|
// N.B. Called by statusReq per second
|
||||||
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) {
|
||||||
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
|
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
|
||||||
int64_t nTables = metaGetTbNum(pMeta);
|
if (type || pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
|
||||||
if (nTables - pMeta->pVnode->config.vndStats.numOfCmprTables > 0 ||
|
|
||||||
pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
|
|
||||||
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
|
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
|
||||||
int64_t num = 0;
|
int64_t num = 0;
|
||||||
vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
|
vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
|
||||||
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
|
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
|
||||||
|
|
||||||
pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
|
pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
|
||||||
pMeta->pVnode->config.vndStats.numOfCmprTables = nTables;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||||
|
|
|
@ -388,7 +388,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||||
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 0);
|
||||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||||
pLoad->compStorage = (int64_t)2 * 1073741824;
|
pLoad->compStorage = (int64_t)2 * 1073741824;
|
||||||
pLoad->pointsWritten = 100;
|
pLoad->pointsWritten = 100;
|
||||||
|
|
Loading…
Reference in New Issue