opt msg on mnd

This commit is contained in:
yihaoDeng 2024-01-10 10:03:38 +08:00
parent 273e47a1fb
commit 9dbddeed16
2 changed files with 86 additions and 90 deletions

View File

@ -95,9 +95,9 @@ int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false; bool tsMonitorComp = false;
// audit // audit
bool tsEnableAudit = true; bool tsEnableAudit = true;
bool tsEnableAuditCreateTable = true; bool tsEnableAuditCreateTable = true;
int32_t tsAuditInterval = 5000; int32_t tsAuditInterval = 5000;
// telem // telem
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
@ -252,7 +252,7 @@ int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 60; int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0; float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 15; int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10; int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
@ -282,7 +282,7 @@ int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsS3PageCacheSize = 4096; // number of pages int32_t tsS3PageCacheSize = 4096; // number of pages
int32_t tsS3UploadDelaySec = 60 * 60 * 24; int32_t tsS3UploadDelaySec = 60 * 60 * 24;
bool tsExperimental = true; bool tsExperimental = true;
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
@ -691,8 +691,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -711,8 +710,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0) 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER,
0) CFG_DYN_ENT_SERVER) != 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0) 0)
@ -1465,40 +1464,38 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
}; };
static OptionNameAndVar options[] = { static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},
{"audit", &tsEnableAudit}, {"asynclog", &tsAsyncLog},
{"asynclog", &tsAsyncLog}, {"disableStream", &tsDisableStream},
{"disableStream", &tsDisableStream}, {"enableWhiteList", &tsEnableWhiteList},
{"enableWhiteList", &tsEnableWhiteList}, {"telemetryReporting", &tsEnableTelem},
{"telemetryReporting", &tsEnableTelem}, {"monitor", &tsEnableMonitor},
{"monitor", &tsEnableMonitor},
{"mndSdbWriteDelta", &tsMndSdbWriteDelta}, {"mndSdbWriteDelta", &tsMndSdbWriteDelta},
{"minDiskFreeSize", &tsMinDiskFreeSize}, {"minDiskFreeSize", &tsMinDiskFreeSize},
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
{"checkpointInterval", &tsStreamCheckpointInterval}, {"checkpointInterval", &tsStreamCheckpointInterval},
{"keepAliveIdle", &tsKeepAliveIdle}, {"keepAliveIdle", &tsKeepAliveIdle},
{"logKeepDays", &tsLogKeepDays}, {"logKeepDays", &tsLogKeepDays},
{"maxStreamBackendCache", &tsMaxStreamBackendCache}, {"maxStreamBackendCache", &tsMaxStreamBackendCache},
{"mqRebalanceInterval", &tsMqRebalanceInterval}, {"mqRebalanceInterval", &tsMqRebalanceInterval},
{"numOfLogLines", &tsNumOfLogLines}, {"numOfLogLines", &tsNumOfLogLines},
{"queryRspPolicy", &tsQueryRspPolicy}, {"queryRspPolicy", &tsQueryRspPolicy},
{"timeseriesThreshold", &tsTimeSeriesThreshold}, {"timeseriesThreshold", &tsTimeSeriesThreshold},
{"tmqMaxTopicNum", &tmqMaxTopicNum}, {"tmqMaxTopicNum", &tmqMaxTopicNum},
{"transPullupInterval", &tsTransPullupInterval}, {"transPullupInterval", &tsTransPullupInterval},
{"compactPullupInterval", &tsCompactPullupInterval}, {"compactPullupInterval", &tsCompactPullupInterval},
{"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, {"trimVDbIntervalSec", &tsTrimVDbIntervalSec},
{"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlBatchDropNum", &tsTtlBatchDropNum},
{"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlFlushThreshold", &tsTtlFlushThreshold},
{"ttlPushInterval", &tsTtlPushIntervalSec}, {"ttlPushInterval", &tsTtlPushIntervalSec},
//{"s3BlockSize", &tsS3BlockSize}, //{"s3BlockSize", &tsS3BlockSize},
{"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize},
{"s3PageCacheSize", &tsS3PageCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize},
{"s3UploadDelaySec", &tsS3UploadDelaySec}, {"s3UploadDelaySec", &tsS3UploadDelaySec},
{"supportVnodes", &tsNumOfSupportVnodes}, {"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental} {"experimental", &tsExperimental}};
};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false); taosCfgSetOption(options, tListLen(options), pItem, false);
@ -1692,36 +1689,34 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, {"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag},
{"idxDebugFlag", &idxDebugFlag}, {"jniDebugFlag", &jniDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"jniDebugFlag", &jniDebugFlag}, {"qDebugFlag", &qDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag},
{"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag},
}; };
static OptionNameAndVar options[] = { static OptionNameAndVar options[] = {{"asyncLog", &tsAsyncLog},
{"asyncLog", &tsAsyncLog}, {"assert", &tsAssert},
{"assert", &tsAssert}, {"compressMsgSize", &tsCompressMsgSize},
{"compressMsgSize", &tsCompressMsgSize}, {"countAlwaysReturnValue", &tsCountAlwaysReturnValue},
{"countAlwaysReturnValue", &tsCountAlwaysReturnValue}, {"crashReporting", &tsEnableCrashReport},
{"crashReporting", &tsEnableCrashReport}, {"enableCoreFile", &tsAsyncLog},
{"enableCoreFile", &tsAsyncLog}, {"enableQueryHb", &tsEnableQueryHb},
{"enableQueryHb", &tsEnableQueryHb}, {"keepColumnName", &tsKeepColumnName},
{"keepColumnName", &tsKeepColumnName}, {"keepAliveIdle", &tsKeepAliveIdle},
{"keepAliveIdle", &tsKeepAliveIdle}, {"logKeepDays", &tsLogKeepDays},
{"logKeepDays", &tsLogKeepDays}, {"maxInsertBatchRows", &tsMaxInsertBatchRows},
{"maxInsertBatchRows", &tsMaxInsertBatchRows}, {"maxRetryWaitTime", &tsMaxRetryWaitTime},
{"maxRetryWaitTime", &tsMaxRetryWaitTime}, {"minSlidingTime", &tsMinSlidingTime},
{"minSlidingTime", &tsMinSlidingTime}, {"minIntervalTime", &tsMinIntervalTime},
{"minIntervalTime", &tsMinIntervalTime}, {"numOfLogLines", &tsNumOfLogLines},
{"numOfLogLines", &tsNumOfLogLines}, {"querySmaOptimize", &tsQuerySmaOptimize},
{"querySmaOptimize", &tsQuerySmaOptimize}, {"queryPolicy", &tsQueryPolicy},
{"queryPolicy", &tsQueryPolicy}, {"queryPlannerTrace", &tsQueryPlannerTrace},
{"queryPlannerTrace", &tsQueryPlannerTrace}, {"queryNodeChunkSize", &tsQueryNodeChunkSize},
{"queryNodeChunkSize", &tsQueryNodeChunkSize}, {"queryUseNodeAllocator", &tsQueryUseNodeAllocator},
{"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, {"smlDot2Underline", &tsSmlDot2Underline},
{"smlDot2Underline", &tsSmlDot2Underline}, {"shellActivityTimer", &tsShellActivityTimer},
{"shellActivityTimer", &tsShellActivityTimer}, {"slowLogThreshold", &tsSlowLogThreshold},
{"slowLogThreshold", &tsSlowLogThreshold}, {"useAdapter", &tsUseAdapter},
{"useAdapter", &tsUseAdapter}, {"experimental", &tsExperimental}};
{"experimental", &tsExperimental}
};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false); taosCfgSetOption(options, tListLen(options), pItem, false);
@ -1763,7 +1758,7 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag
void taosSetAllDebugFlag(int32_t flag) { void taosSetAllDebugFlag(int32_t flag) {
if (flag <= 0) return; if (flag <= 0) return;
SArray *noNeedToSetVars = NULL; SArray * noNeedToSetVars = NULL;
SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag"); SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag");
if (pItem != NULL) { if (pItem != NULL) {
pItem->i32 = flag; pItem->i32 = flag;
@ -1798,7 +1793,7 @@ void taosSetAllDebugFlag(int32_t flag) {
taosArrayClear(noNeedToSetVars); // reset array taosArrayClear(noNeedToSetVars); // reset array
uInfo("all debug flag are set to %d", flag); uInfo("all debug flag are set to %d", flag);
if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist
} }
int8_t taosGranted() { return atomic_load_8(&tsGrant); } int8_t taosGranted() { return atomic_load_8(&tsGrant); }

View File

@ -107,7 +107,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
mTrace("pullup trans msg"); mTrace("pullup trans msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
@ -117,7 +117,7 @@ static void mndPullupTrans(SMnode *pMnode) {
static void mndPullupCompacts(SMnode *pMnode) { static void mndPullupCompacts(SMnode *pMnode) {
mTrace("pullup compact timer msg"); mTrace("pullup compact timer msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
@ -127,7 +127,7 @@ static void mndPullupCompacts(SMnode *pMnode) {
static void mndPullupTtl(SMnode *pMnode) { static void mndPullupTtl(SMnode *pMnode) {
mTrace("pullup ttl"); mTrace("pullup ttl");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
@ -135,14 +135,14 @@ static void mndPullupTtl(SMnode *pMnode) {
static void mndPullupTrimDb(SMnode *pMnode) { static void mndPullupTrimDb(SMnode *pMnode) {
mTrace("pullup trim"); mTrace("pullup trim");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
@ -151,7 +151,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); void * pReq = mndBuildCheckpointTickMsg(&contLen, sec);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
@ -160,7 +160,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
static void mndStreamCheckpointRemain(SMnode *pMnode) { static void mndStreamCheckpointRemain(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, 0); void * pReq = mndBuildCheckpointTickMsg(&contLen, 0);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
@ -169,7 +169,7 @@ static void mndStreamCheckpointRemain(SMnode *pMnode) {
static void mndStreamCheckNode(SMnode *pMnode) { static void mndStreamCheckNode(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_NODECHECK_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
@ -179,7 +179,7 @@ static void mndStreamCheckNode(SMnode *pMnode) {
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
mTrace("pullup telem msg"); mTrace("pullup telem msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
@ -189,7 +189,7 @@ static void mndPullupTelem(SMnode *pMnode) {
static void mndPullupGrant(SMnode *pMnode) { static void mndPullupGrant(SMnode *pMnode) {
mTrace("pullup grant msg"); mTrace("pullup grant msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527}; .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
@ -200,7 +200,7 @@ static void mndPullupGrant(SMnode *pMnode) {
static void mndIncreaseUpTime(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) {
mTrace("increate uptime"); mTrace("increate uptime");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528}; .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
@ -254,7 +254,7 @@ static void mndCheckDnodeOffline(SMnode *pMnode) {
mTrace("check dnode offline"); mTrace("check dnode offline");
if (mndAcquireRpc(pMnode) != 0) return; if (mndAcquireRpc(pMnode) != 0) return;
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
void *pIter = NULL; void *pIter = NULL;
@ -299,14 +299,14 @@ static bool mnodeIsNotLeader(SMnode *pMnode) {
} }
static int32_t minCronTime() { static int32_t minCronTime() {
int64_t min = INT64_MAX; int32_t min = INT32_MAX;
min = TMIN(min, tsTtlPushIntervalSec); min = TMIN(min, tsTtlPushIntervalSec);
min = TMIN(min, tsTrimVDbIntervalSec); min = TMIN(min, tsTrimVDbIntervalSec);
min = TMIN(min, tsTransPullupInterval); min = TMIN(min, tsTransPullupInterval);
min = TMIN(min, tsCompactPullupInterval); min = TMIN(min, tsCompactPullupInterval);
min = TMIN(min, tsMqRebalanceInterval); min = TMIN(min, tsMqRebalanceInterval);
min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 5); // checkpointRemain min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval); min = TMIN(min, tsStreamNodeCheckInterval);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
@ -386,7 +386,8 @@ static void *mndThreadFp(void *param) {
int64_t minCron = minCronTime(); int64_t minCron = minCronTime();
if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) { if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) {
// not leader, do nothing // not leader, do nothing
mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)) terrno = 0; mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno));
terrno = 0;
continue; continue;
} }
mndDoTimerPullupTask(pMnode, sec); mndDoTimerPullupTask(pMnode, sec);
@ -700,7 +701,7 @@ void mndStop(SMnode *pMnode) {
} }
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode * pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
@ -802,7 +803,7 @@ _OVER:
} }
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode * pMnode = pMsg->info.node;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
@ -857,7 +858,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
if (mndAcquireRpc(pMnode) != 0) return -1; if (mndAcquireRpc(pMnode) != 0) return -1;
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs(); int64_t ms = taosGetTimestampMs();
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
@ -941,7 +942,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
tstrncpy(desc.status, "unsynced", sizeof(desc.status)); tstrncpy(desc.status, "unsynced", sizeof(desc.status));
for (int32_t i = 0; i < pVgroup->replica; ++i) { for (int32_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid * pVgid = &pVgroup->vnodeGid[i];
SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
pVnDesc->dnode_id = pVgid->dnodeId; pVnDesc->dnode_id = pVgid->dnodeId;
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role)); tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));