Merge pull request #16341 from taosdata/fix/dnode
enh: increase startup time monitoring
This commit is contained in:
commit
8d511d4837
|
@ -130,6 +130,7 @@ extern int32_t tsMqRebalanceInterval;
|
|||
extern int32_t tsTtlUnit;
|
||||
extern int32_t tsTtlPushInterval;
|
||||
extern int32_t tsGrantHBInterval;
|
||||
extern int32_t tsUptimeInterval;
|
||||
|
||||
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
|
||||
|
|
|
@ -170,6 +170,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
|
|
|
@ -66,8 +66,9 @@ static const SSysDbTableSchema bnodesSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema clusterSchema[] = {
|
||||
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
};
|
||||
|
||||
|
|
|
@ -164,6 +164,7 @@ int32_t tsMqRebalanceInterval = 2;
|
|||
int32_t tsTtlUnit = 86400;
|
||||
int32_t tsTtlPushInterval = 86400;
|
||||
int32_t tsGrantHBInterval = 60;
|
||||
int32_t tsUptimeInterval = 300; // seconds
|
||||
|
||||
#ifndef _STORAGE
|
||||
int32_t taosSetTfsCfg(SConfig *pCfg) {
|
||||
|
|
|
@ -27,6 +27,7 @@ void mndCleanupCluster(SMnode *pMnode);
|
|||
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
|
||||
int64_t mndGetClusterId(SMnode *pMnode);
|
||||
int64_t mndGetClusterCreateTime(SMnode *pMnode);
|
||||
float mndGetClusterUpTime(SMnode *pMnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -179,6 +179,7 @@ typedef struct {
|
|||
char name[TSDB_CLUSTER_ID_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
int32_t upTime;
|
||||
} SClusterObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "mndTrans.h"
|
||||
|
||||
#define CLUSTER_VER_NUMBE 1
|
||||
#define CLUSTER_RESERVE_SIZE 64
|
||||
#define CLUSTER_RESERVE_SIZE 60
|
||||
|
||||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
|
||||
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -29,6 +29,7 @@ static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SCl
|
|||
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
|
||||
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndProcessUptimeTimer(SRpcMsg *pReq);
|
||||
|
||||
int32_t mndInitCluster(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -42,8 +43,10 @@ int32_t mndInitCluster(SMnode *pMnode) {
|
|||
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
|
||||
};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_UPTIME_TIMER, mndProcessUptimeTimer);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -62,40 +65,69 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t mndGetClusterId(SMnode *pMnode) {
|
||||
static SClusterObj *mndAcquireCluster(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
int64_t clusterId = -1;
|
||||
|
||||
while (1) {
|
||||
SClusterObj *pCluster = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
clusterId = pCluster->id;
|
||||
return pCluster;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void mndReleaseCluster(SMnode *pMnode, SClusterObj *pCluster) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pCluster);
|
||||
}
|
||||
|
||||
int64_t mndGetClusterId(SMnode *pMnode) {
|
||||
int64_t clusterId = 0;
|
||||
SClusterObj *pCluster = mndAcquireCluster(pMnode);
|
||||
if (pCluster != NULL) {
|
||||
clusterId = pCluster->id;
|
||||
mndReleaseCluster(pMnode, pCluster);
|
||||
}
|
||||
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
int64_t createTime = INT64_MAX;
|
||||
|
||||
while (1) {
|
||||
SClusterObj *pCluster = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
int64_t createTime = 0;
|
||||
SClusterObj *pCluster = mndAcquireCluster(pMnode);
|
||||
if (pCluster != NULL) {
|
||||
createTime = pCluster->createdTime;
|
||||
sdbRelease(pSdb, pCluster);
|
||||
mndReleaseCluster(pMnode, pCluster);
|
||||
}
|
||||
|
||||
return createTime;
|
||||
}
|
||||
|
||||
static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) {
|
||||
#if 0
|
||||
int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000;
|
||||
upTime = upTime + pCluster->upTime;
|
||||
return upTime;
|
||||
#else
|
||||
return pCluster->upTime;
|
||||
#endif
|
||||
}
|
||||
|
||||
float mndGetClusterUpTime(SMnode *pMnode) {
|
||||
int64_t upTime = 0;
|
||||
SClusterObj *pCluster = mndAcquireCluster(pMnode);
|
||||
if (pCluster != NULL) {
|
||||
upTime = mndGetClusterUpTimeImp(pCluster);
|
||||
mndReleaseCluster(pMnode, pCluster);
|
||||
}
|
||||
|
||||
return upTime / 86400.0f;
|
||||
}
|
||||
|
||||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -107,6 +139,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
|
|||
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -144,6 +177,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
|
||||
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -162,6 +196,7 @@ _OVER:
|
|||
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
|
||||
mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster);
|
||||
pSdb->pMnode->clusterId = pCluster->id;
|
||||
pCluster->updateTime = taosGetTimestampMs();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -171,7 +206,10 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
|
|||
}
|
||||
|
||||
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
|
||||
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld,
|
||||
pNew, pOld->upTime, pNew->upTime);
|
||||
pOld->upTime = pNew->upTime;
|
||||
pOld->updateTime = taosGetTimestampMs();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -242,6 +280,10 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, buf, false);
|
||||
|
||||
int32_t upTime = mndGetClusterUpTimeImp(pCluster);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&upTime, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
|
||||
|
||||
|
@ -257,3 +299,40 @@ static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SClusterObj clusterObj = {0};
|
||||
SClusterObj *pCluster = mndAcquireCluster(pMnode);
|
||||
if (pCluster != NULL) {
|
||||
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
|
||||
clusterObj.upTime += tsUptimeInterval;
|
||||
mndReleaseCluster(pMnode, pCluster);
|
||||
}
|
||||
|
||||
if (clusterObj.id <= 0) {
|
||||
mError("can't get cluster info while update uptime");
|
||||
return 0;
|
||||
}
|
||||
|
||||
mTrace("update cluster uptime to %" PRId64, clusterObj.upTime);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) return -1;
|
||||
|
||||
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -100,6 +100,16 @@ static void mndGrantHeartBeat(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void mndIncreaseUpTime(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
if (pReq != NULL) {
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void *mndThreadFp(void *param) {
|
||||
SMnode *pMnode = param;
|
||||
int64_t lastTime = 0;
|
||||
|
@ -129,6 +139,10 @@ static void *mndThreadFp(void *param) {
|
|||
if (lastTime % (tsGrantHBInterval * 10) == 0) {
|
||||
mndGrantHeartBeat(pMnode);
|
||||
}
|
||||
|
||||
if ((lastTime % (tsUptimeInterval * 10)) == ((tsUptimeInterval - 1) * 10)) {
|
||||
mndIncreaseUpTime(pMnode);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -556,7 +570,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
|||
}
|
||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) {
|
||||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -705,7 +720,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
if (pObj->id == pMnode->selfDnodeId) {
|
||||
pClusterInfo->first_ep_dnode_id = pObj->id;
|
||||
tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
|
||||
pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
|
||||
pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
|
||||
// pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
|
||||
tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
|
||||
} else {
|
||||
tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
|
||||
|
|
|
@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
|
||||
} else {
|
||||
mInfo("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
|
||||
mDebug("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
|
||||
}
|
||||
pMgmt->transId = 0;
|
||||
taosWUnLockLatch(&pMgmt->lock);
|
||||
|
@ -118,7 +118,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
|||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
|
||||
pMgmt->errCode = cbMeta.code;
|
||||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
||||
mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
||||
cbMeta.code, cbMeta.index, cbMeta.term);
|
||||
|
||||
taosWLockLatch(&pMgmt->lock);
|
||||
|
@ -126,7 +126,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
|||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode));
|
||||
} else {
|
||||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
|
||||
mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
|
||||
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
|
||||
}
|
||||
pMgmt->transId = 0;
|
||||
|
@ -228,7 +228,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
syncInfo.isStandBy = pMgmt->standby;
|
||||
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
|
||||
|
||||
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
|
||||
mDebug("start to open mnode sync, standby:%d", pMgmt->standby);
|
||||
if (pMgmt->standby || pMgmt->replica.id > 0) {
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->replicaNum = 1;
|
||||
|
@ -236,7 +236,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
SNodeInfo *pNode = &pCfg->nodeInfo[0];
|
||||
tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
|
||||
pNode->nodePort = pMgmt->replica.port;
|
||||
mInfo("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
|
||||
mDebug("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
|
|
|
@ -145,7 +145,7 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t *buf
|
|||
if (nread < 0) {
|
||||
uError("http-report read error:%s", uv_err_name(nread));
|
||||
} else {
|
||||
uInfo("http-report succ to read %d bytes, just ignore it", nread);
|
||||
uTrace("http-report succ to read %d bytes, just ignore it", nread);
|
||||
}
|
||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
|
|||
terrno = TAOS_SYSTEM_ERROR(status);
|
||||
uError("http-report failed to send data %s", uv_strerror(status));
|
||||
} else {
|
||||
uInfo("http-report succ to send data");
|
||||
uTrace("http-report succ to send data");
|
||||
}
|
||||
uv_read_start((uv_stream_t *)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
||||
}
|
||||
|
|
|
@ -702,7 +702,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
|
|||
taosMsleep(50);
|
||||
}
|
||||
|
||||
uInfo("cache:%s will be cleaned up", pCacheObj->name);
|
||||
uTrace("cache:%s will be cleaned up", pCacheObj->name);
|
||||
doCleanupDataCache(pCacheObj);
|
||||
}
|
||||
|
||||
|
|
|
@ -83,8 +83,8 @@ int32_t tsCompressInit() {
|
|||
if (lossyFloat == false && lossyDouble == false) return 0;
|
||||
|
||||
tdszInit(fPrecision, dPrecision, maxRange, curRange, Compressor);
|
||||
if (lossyFloat) uInfo("lossy compression float is opened. ");
|
||||
if (lossyDouble) uInfo("lossy compression double is opened. ");
|
||||
if (lossyFloat) uTrace("lossy compression float is opened. ");
|
||||
if (lossyDouble) uTrace("lossy compression double is opened. ");
|
||||
return 1;
|
||||
}
|
||||
// exit call
|
||||
|
|
|
@ -21,6 +21,6 @@ sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 bin
|
|||
|
||||
print =============== create drop qnode 1
|
||||
sql create qnode on dnode 1
|
||||
sql create snode on dnode 1
|
||||
sql create bnode on dnode 1
|
||||
#sql create snode on dnode 1
|
||||
#sql create bnode on dnode 1
|
||||
|
||||
|
|
Loading…
Reference in New Issue