From cebbc0719be5b16db0a2e06e14feaa9a0c2bb617 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 2 Dec 2021 11:22:03 +0800 Subject: [PATCH] TD-10431 mnode cluster --- include/common/taosmsg.h | 6 +- include/dnode/mnode/mnode.h | 2 +- include/os/osSysinfo.h | 2 +- source/dnode/mgmt/impl/inc/dndDnode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 2 +- source/dnode/mgmt/impl/src/dndDnode.c | 14 ++-- source/dnode/mgmt/impl/src/dndTransport.c | 6 +- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndInt.h | 6 +- source/dnode/mnode/impl/src/mndCluster.c | 96 ++++++++++++++++++++++- source/dnode/mnode/impl/src/mndDnode.c | 11 ++- source/dnode/mnode/impl/src/mndTelem.c | 4 +- source/dnode/mnode/impl/src/mnode.c | 24 ------ source/os/src/osSysinfo.c | 17 ++-- 14 files changed, 128 insertions(+), 66 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index b655016f3c..9bae38e3a7 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -658,7 +658,7 @@ typedef struct { typedef struct SStatusMsg { int32_t sver; int32_t dnodeId; - int64_t clusterId; + int32_t clusterId; uint32_t rebootTime; // time stamp for last reboot int16_t numOfCores; int16_t numOfSupportMnodes; @@ -671,9 +671,9 @@ typedef struct SStatusMsg { typedef struct { int32_t dnodeId; + int32_t clusterId; int8_t dropped; - char reserved[3]; - int64_t clusterId; + char reserved[7]; } SDnodeCfg; typedef struct { diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index a8a8117886..c7415af0d5 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -45,7 +45,7 @@ typedef struct SMnodeLoad { typedef struct { int32_t dnodeId; - int64_t clusterId; + int32_t clusterId; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index a3919890bd..56f6b3e0da 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -52,7 +52,7 @@ bool taosGetSysMemory(float *memoryUsedMB); void taosPrintOsInfo(); int taosSystem(const char *cmd); void taosKillSystem(); -bool taosGetSystemUid(char *uid, int32_t uidlen); +int32_t taosGetSystemUid(char *uid, int32_t uidlen); char * taosGetCmdlineByPID(int pid); void taosSetCoreDump(bool enable); diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index 4bb4cad8cc..c21c6a0b86 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -27,7 +27,7 @@ void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndGetDnodeId(SDnode *pDnode); -int64_t dndGetClusterId(SDnode *pDnode); +int32_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 39243a1795..136f6eee0c 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -57,8 +57,8 @@ typedef struct { typedef struct { int32_t dnodeId; int32_t dropped; + int32_t clusterId; uint32_t rebootTime; - int64_t clusterId; SEpSet mnodeEpSet; char *file; SHashObj *dnodeHash; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 20941847d3..854e7e3570 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -26,10 +26,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) { return dnodeId; } -int64_t dndGetClusterId(SDnode *pDnode) { +int32_t dndGetClusterId(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - int64_t clusterId = pMgmt->clusterId; + int32_t clusterId = pMgmt->clusterId; taosRUnLockLatch(&pMgmt->latch); return clusterId; } @@ -201,7 +201,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) { dError("failed to read %s since clusterId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pMgmt->clusterId = atoll(clusterId->valuestring); + pMgmt->clusterId = atol(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_String) { @@ -308,7 +308,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%d\",\n", pMgmt->clusterId); len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { @@ -350,7 +350,7 @@ static void dndSendStatusMsg(SDnode *pDnode) { taosRLockLatch(&pMgmt->latch); pStatus->sver = htonl(pDnode->opt.sver); pStatus->dnodeId = htonl(pMgmt->dnodeId); - pStatus->clusterId = htobe64(pMgmt->clusterId); + pStatus->clusterId = htonl(pMgmt->clusterId); pStatus->rebootTime = htonl(pMgmt->rebootTime); pStatus->numOfCores = htons(pDnode->opt.numOfCores); pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores); @@ -379,7 +379,7 @@ static void dndSendStatusMsg(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { - dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); + dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; @@ -420,7 +420,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SStatusRsp *pRsp = pMsg->pCont; SDnodeCfg *pCfg = &pRsp->dnodeCfg; pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htobe64(pCfg->clusterId); + pCfg->clusterId = htonl(pCfg->clusterId); dndUpdateDnodeCfg(pDnode, pCfg); if (pCfg->dropped) return; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 7e2bc69ea8..e33fba494c 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -58,6 +58,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DB] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DB] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_USE_DB] = dndProcessMnodeWriteMsg; @@ -115,12 +117,12 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dndProcessMnodeWriteMsg; // message from dnode to mnode - pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp; pMgmt->msgFp[TSDB_MSG_TYPE_GRANT] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dndProcessDnodeRsp; pMgmt->msgFp[TSDB_MSG_TYPE_STATUS] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dndProcessDnodeRsp; + pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9a48d440dc..9facb8829c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -115,7 +115,7 @@ typedef struct STrans { } STrans; typedef struct SClusterObj { - int64_t id; + int32_t id; char uid[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; int64_t updateTime; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index bb3f0ca263..8910ed4e63 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -36,7 +36,7 @@ typedef struct { typedef struct SMnode { int32_t dnodeId; - int64_t clusterId; + int32_t clusterId; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; @@ -58,10 +58,6 @@ typedef struct SMnode { char *charset; } SMnode; -tmr_h mndGetTimer(SMnode *pMnode); -int32_t mndGetDnodeId(SMnode *pMnode); -int64_t mndGetClusterId(SMnode *pMnode); - void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg); void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg); diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 98e9e70229..2e2e897cfb 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -14,8 +14,96 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndCluster.h" +#include "mndTrans.h" -int32_t mndInitCluster(SMnode *pMnode) { return 0; } -void mndCleanupCluster(SMnode *pMnode) {} \ No newline at end of file +#define SDB_CLUSTER_VER 1 + +static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { + SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj)); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pCluster->id); + SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) + SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) + SDB_SET_BINARY(pRaw, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN) + + return pRaw; +} + +static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_CLUSTER_VER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode cluster since %s", terrstr()); + return NULL; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj)); + SClusterObj *pCluster = sdbGetRowObj(pRow); + if (pCluster == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) + SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN) + + return pRow; +} + +static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { + mTrace("cluster:%d, perform insert action", pCluster->id); + return 0; +} + +static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { + mTrace("cluster:%d, perform delete action", pCluster->id); + return 0; +} + +static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) { + mTrace("cluster:%d, perform update action", pSrcCluster->id); + return 0; +} + +static int32_t mndCreateDefaultCluster(SMnode *pMnode) { + SClusterObj clusterObj = {0}; + clusterObj.createdTime = taosGetTimestampMs(); + clusterObj.updateTime = clusterObj.createdTime; + + int32_t code = taosGetSystemUid(clusterObj.uid, TSDB_CLUSTER_ID_LEN); + if (code != 0) { + strcpy(clusterObj.uid, "tdengine2.0"); + mError("failed to get uid from system, set to default val %s", clusterObj.uid); + } else { + mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid); + } + clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN); + pMnode->clusterId = clusterObj.id; + + SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + mTrace("cluster:%d, will be created while deploy sdb", clusterObj.id); + return sdbWrite(pMnode->pSdb, pRaw); +} + +int32_t mndInitCluster(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_CLUSTER, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultCluster, + .encodeFp = (SdbEncodeFp)mndClusterActionEncode, + .decodeFp = (SdbDecodeFp)mndClusterActionDecode, + .insertFp = (SdbInsertFp)mndClusterActionInsert, + .updateFp = (SdbUpdateFp)mndClusterActionUpdate, + .deleteFp = (SdbDeleteFp)mndClusterActionDelete}; + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupCluster(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 17a7d1dd9d..aaef8dbf4b 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -214,7 +214,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->sver = htonl(pStatus->sver); pStatus->dnodeId = htonl(pStatus->dnodeId); - pStatus->clusterId = htobe64(pStatus->clusterId); + pStatus->clusterId = htonl(pStatus->clusterId); pStatus->rebootTime = htonl(pStatus->rebootTime); pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes); @@ -259,15 +259,14 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return TSDB_CODE_MND_INVALID_MSG_VERSION; } - int64_t clusterId = mndGetClusterId(pMnode); if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId); + mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId); } else { - if (pStatus->clusterId != clusterId) { + if (pStatus->clusterId != pMnode->clusterId) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId); + mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId); mndReleaseDnode(pMnode, pDnode); return TSDB_CODE_MND_INVALID_CLUSTER_ID; } else { @@ -306,7 +305,7 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dropped = 0; - pRsp->dnodeCfg.clusterId = htobe64(clusterId); + pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); pMsg->contLen = contLen; diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index f7fafc7095..f9f349aad8 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -203,9 +203,9 @@ static void mndSendTelemetryReport() { return; } - int64_t clusterId = mndGetClusterId(NULL); + int32_t clusterId = 0; char clusterIdStr[20] = {0}; - snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId); + snprintf(clusterIdStr, sizeof(clusterIdStr), "%d", clusterId); SBufferWriter bw = tbufInitWriter(NULL, false); mndBeginObject(&bw); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index a853d98634..f6d62ebc8b 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -32,30 +32,6 @@ #include "mndUser.h" #include "mndVgroup.h" -int32_t mndGetDnodeId(SMnode *pMnode) { - if (pMnode != NULL) { - return pMnode->dnodeId; - } - - return -1; -} - -int64_t mndGetClusterId(SMnode *pMnode) { - if (pMnode != NULL) { - return pMnode->clusterId; - } - - return -1; -} - -tmr_h mndGetTimer(SMnode *pMnode) { - if (pMnode != NULL) { - return pMnode->timer; - } - - return NULL; -} - void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) { (*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index e37e059b7d..ca817c4c1e 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -252,14 +252,14 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) { void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); } -bool taosGetSystemUid(char *uid) { +int32_t taosGetSystemUid(char *uid, int32_t uidlen) { GUID guid; CoCreateGuid(&guid); sprintf(uid, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0], guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]); - return true; + return 0; } char *taosGetCmdlineByPID(int pid) { return ""; } @@ -452,12 +452,12 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { } } -bool taosGetSystemUid(char *uid) { +int32_t taosGetSystemUid(char *uid, int32_t uidlen) { uuid_t uuid = {0}; uuid_generate(uuid); // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null uuid_unparse_lower(uuid, uid); - return true; + return 0; } char *taosGetCmdlineByPID(int pid) { @@ -1070,13 +1070,13 @@ void taosSetCoreDump(bool enable) { #endif } -bool taosGetSystemUid(char *uid, int32_t uidlen) { +int32_t taosGetSystemUid(char *uid, int32_t uidlen) { int fd; int len = 0; fd = open("/proc/sys/kernel/random/uuid", 0); if (fd < 0) { - return false; + return -1; } else { len = read(fd, uid, uidlen); close(fd); @@ -1084,9 +1084,10 @@ bool taosGetSystemUid(char *uid, int32_t uidlen) { if (len >= 36) { uid[36] = 0; - return true; + return 0; } - return false; + + return -1; } char *taosGetCmdlineByPID(int pid) {