From df2331d5e7dcb2c090a677464a6c5ff29b007f67 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Thu, 7 Nov 2024 15:29:33 +0800 Subject: [PATCH] Add config req and config thread. --- include/common/tglobal.h | 4 +- include/common/tmsg.h | 8 ++++ include/common/tmsgdef.h | 1 + source/common/src/tglobal.c | 2 + source/common/src/tmsg.c | 40 ++++++++++++++-- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 2 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 48 +++++++++++++++++++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 52 +++++++++++++++++++++ 8 files changed, 152 insertions(+), 5 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index c8dc12ed3b..56a258b01e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -49,6 +49,8 @@ extern char tsLocalEp[]; extern char tsVersionName[]; extern uint16_t tsServerPort; extern int32_t tsVersion; +int32_t configVersion; +int32_t configInited; extern int32_t tsStatusInterval; extern int32_t tsNumOfSupportVnodes; extern char tsEncryptAlgorithm[]; @@ -154,7 +156,7 @@ extern bool tsEnableCrashReport; extern char *tsTelemUri; extern char *tsClientCrashReportUri; extern char *tsSvrCrashReportUri; -extern int8_t tsSafetyCheckLevel; +extern int8_t tsSafetyCheckLevel; enum { TSDB_SAFETY_CHECK_LEVELL_NEVER = 0, TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1, diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 53366f6f9d..c5bca8a5f7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1806,6 +1806,14 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); +typedef struct { + int32_t cver; +} SConfigReq; + +int32_t tSerializeSConfigReq(void* buf, int32_t bufLen, SConfigReq* pReq); +int32_t tDeserializeSConfigReq(void* buf, int32_t bufLen, SConfigReq* pReq); +// void tFreeSStatusReq(SStatusReq* pReq); + typedef struct { int32_t dnodeId; char machineId[TSDB_MACHINE_ID_LEN + 1]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c22a3da5ad..93bfe306b6 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -260,6 +260,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7eab064eea..c83e3e8db2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -43,6 +43,8 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port char tsVersionName[16] = "community"; uint16_t tsServerPort = 6030; int32_t tsVersion = 30000000; +int32_t configVersion = 0; +int32_t configInited = 0; int32_t tsStatusInterval = 1; // second int32_t tsNumOfSupportVnodes = 256; char tsEncryptAlgorithm[16] = {0}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 458badc764..ab63d8083c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1600,6 +1600,38 @@ _exit: return code; } +int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver)); + tEndEncode(&encoder); +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->cver)); + tEndDecode(&decoder); +_exit: + tDecoderClear(&decoder); + return code; +} + void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); } int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) { @@ -2221,10 +2253,10 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal int32_t lino; tDecoderInit(&decoder, buf, bufLen); - int32_t numOfAlgos = 0; - int32_t nameLen; - int32_t type; - char name[TSDB_ANAL_ALGO_KEY_LEN]; + int32_t numOfAlgos = 0; + int32_t nameLen; + int32_t type; + char name[TSDB_ANAL_ALGO_KEY_LEN]; SAnalUrl url = {0}; TAOS_CHECK_EXIT(tStartDecode(&decoder)); diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index cbf1959e75..47bbb88ee1 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -28,6 +28,7 @@ typedef struct SDnodeMgmt { const char *path; const char *name; TdThread statusThread; + TdThread configThread; TdThread statusInfoThread; TdThread notifyThread; TdThread monitorThread; @@ -50,6 +51,7 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); +void dmSendConfigReq(SDnodeMgmt *pMgmt); void dmUpdateStatusInfo(SDnodeMgmt *pMgmt); void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index d6b792ca74..bda9dc3944 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -283,6 +283,54 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dmProcessStatusRsp(pMgmt, &rpcRsp); } +void dmSendConfigReq(SDnodeMgmt *pMgmt) { + int32_t code = 0; + SConfigReq req = {0}; + + req.cver = configVersion; + dDebug("send config req to mnode, configVersion:%d", req.cver); + + int32_t contLen = tSerializeSConfigReq(NULL, 0, &req); + if (contLen < 0) { + dError("failed to serialize status req since %s", tstrerror(contLen)); + return; + } + + void *pHead = rpcMallocCont(contLen); + contLen = tSerializeSConfigReq(pHead, contLen, &req); + if (contLen < 0) { + rpcFreeCont(pHead); + dError("failed to serialize status req since %s", tstrerror(contLen)); + return; + } + + SRpcMsg rpcMsg = {.pCont = pHead, + .contLen = contLen, + .msgType = TDMT_MND_CONFIG, + .info.ahandle = 0, + .info.notFreeAhandle = 1, + .info.refId = 0, + .info.noResp = 0, + .info.handle = 0}; + SRpcMsg rpcRsp = {0}; + + SEpSet epSet = {0}; + int8_t epUpdated = 0; + (void)dmGetMnodeEpSet(pMgmt->pData, &epSet); + + dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq); + code = + rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000); + if (code != 0) { + dError("failed to send status req since %s", tstrerror(code)); + return; + } + + if (rpcRsp.code != 0) { + } else { + } +} + void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) { SMonVloadInfo vinfo = {0}; dDebug("begin to get vnode loads"); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 7f802f3837..9c6cd4fdd6 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -47,6 +47,35 @@ static void *dmStatusThreadFp(void *param) { return NULL; } +static void *dmConfigThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-config"); + + int32_t upTimeCount = 0; + int64_t upTime = 0; + + while (1) { + taosMsleep(200); + if (pMgmt->pData->dropped || pMgmt->pData->stopped || configInited) break; + + int64_t curTime = taosGetTimestampMs(); + if (curTime < lastTime) lastTime = curTime; + float interval = (curTime - lastTime) / 1000.0f; + if (interval >= tsStatusInterval) { + dmSendConfigReq(pMgmt); + lastTime = curTime; + + if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) { + upTime = taosGetOsUptime() - tsDndStartOsUptime; + tsDndUpTime = TMAX(tsDndUpTime, upTime); + } + } + } + + return NULL; +} + static void *dmStatusInfoThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); @@ -309,6 +338,22 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) { + int32_t code = 0; + TdThreadAttr thAttr; + (void)taosThreadAttrInit(&thAttr); + (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + dError("failed to create config thread since %s", tstrerror(code)); + return code; + } + + (void)taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("config-status", "initialized"); + return 0; +} + int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) { int32_t code = 0; TdThreadAttr thAttr; @@ -332,6 +377,13 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) { } } +void dmStopConfigThread(SDnodeMgmt *pMgmt) { + if (taosCheckPthreadValid(pMgmt->configThread)) { + (void)taosThreadJoin(pMgmt->configThread, NULL); + taosThreadClear(&pMgmt->configThread); + } +} + void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->statusInfoThread)) { (void)taosThreadJoin(pMgmt->statusInfoThread, NULL);