From 042291c35b736f9356fa7463e3f5f6f5d7353466 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 1 Dec 2023 18:15:47 +0800 Subject: [PATCH] feat: support machine id --- include/common/tgrant.h | 1 + include/util/tdef.h | 1 + source/dnode/mnode/impl/inc/mndCluster.h | 3 + source/dnode/mnode/impl/inc/mndDef.h | 5 + source/dnode/mnode/impl/src/mndCluster.c | 154 +++++++++++++++++++++-- source/dnode/mnode/impl/src/mndGrant.c | 1 + 6 files changed, 158 insertions(+), 7 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index f06fca8014..bc0e5a9cca 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -52,6 +52,7 @@ typedef enum { int32_t grantCheck(EGrantType grant); int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type); +char* grantGetMachineId(); #ifndef GRANTS_CFG #ifdef TD_ENTERPRISE diff --git a/include/util/tdef.h b/include/util/tdef.h index 69d0c1126d..6c7b6d1dbc 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -264,6 +264,7 @@ typedef enum ELogicConditionType { #define TSDB_JOB_STATUS_LEN 32 #define TSDB_CLUSTER_ID_LEN 40 +#define TSDB_MACHINE_ID_LEN 24 #define TSDB_FQDN_LEN 128 #define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) #define TSDB_IPv4ADDR_LEN 16 diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index e33ffdb372..73cfe45ff5 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -28,6 +28,9 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); int64_t mndGetClusterId(SMnode *pMnode); int64_t mndGetClusterCreateTime(SMnode *pMnode); int64_t mndGetClusterUpTime(SMnode *pMnode); +int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds); +int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew); +void mndFreeClusterObj(SClusterObj *pCluster); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 08c0aec46a..4f6fa65063 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -186,12 +186,17 @@ typedef struct { TdThreadMutex mutex; } STrans; +typedef struct { + char id[TSDB_MACHINE_ID_LEN + 1]; +} SMachineId; + typedef struct { int64_t id; char name[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; int64_t updateTime; int32_t upTime; + SArray* pMachineIds; } SClusterObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 4c799e1e1e..794f8724c2 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -18,7 +18,7 @@ #include "mndShow.h" #include "mndTrans.h" -#define CLUSTER_VER_NUMBE 1 +#define CLUSTER_VER_NUMBE 2 #define CLUSTER_RESERVE_SIZE 60 int64_t tsExpireTime = 0; @@ -137,7 +137,9 @@ int64_t mndGetClusterUpTime(SMnode *pMnode) { static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE); + int32_t nMachineIds = taosArrayGetSize(pCluster->pMachineIds); + int32_t machineSize = sizeof(int16_t) + nMachineIds * sizeof(SMachineId); + SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + machineSize + CLUSTER_RESERVE_SIZE); if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; @@ -146,7 +148,12 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER) + SDB_SET_INT16(pRaw, dataPos, nMachineIds, _OVER) + for (int32_t i = 0; i < nMachineIds; ++i) { + SDB_SET_BINARY(pRaw, dataPos, ((SMachineId*)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id, TSDB_MACHINE_ID_LEN, _OVER) + } SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) + SDB_SET_DATALEN(pRaw, dataPos, _OVER); terrno = 0; @@ -164,12 +171,12 @@ _OVER: static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; SClusterObj *pCluster = NULL; - SSdbRow *pRow = NULL; + SSdbRow *pRow = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != CLUSTER_VER_NUMBE) { + if (sver < 0 || sver > CLUSTER_VER_NUMBE) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -186,6 +193,19 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER) + + if (sver > 1) { + int16_t nMachineIds = 0; + SDB_GET_INT16(pRaw, dataPos, &nMachineIds, _OVER) + if (nMachineIds > 0) { + pCluster->pMachineIds = taosArrayInit(nMachineIds, sizeof(SMachineId)); + if (!pCluster->pMachineIds) goto _OVER; + for (int16_t i = 0; i < nMachineIds; ++i) { + SDB_GET_BINARY(pRaw, dataPos, ((SMachineId *)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id, + TSDB_MACHINE_ID_LEN, _OVER) + } + } + } SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -194,6 +214,7 @@ _OVER: if (terrno != 0) { mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster == NULL ? 0 : pCluster->id, pRaw, terrstr()); + mndFreeClusterObj(pCluster); taosMemoryFreeClear(pRow); return NULL; } @@ -236,10 +257,23 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id); pMnode->clusterId = clusterObj.id; + clusterObj.pMachineIds = taosArrayInit(1, sizeof(SMachineId)); + if(!clusterObj.pMachineIds) { + return -1; + } + char *machineId = grantGetMachineId(); + if (machineId) { + taosArrayPush(clusterObj.pMachineIds, machineId); + taosMemoryFree(machineId); + } + mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); - if (pRaw == NULL) return -1; + if (pRaw == NULL) { + mndFreeClusterObj(&clusterObj); + return -1; + } (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); mInfo("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw); @@ -247,6 +281,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-cluster"); if (pTrans == NULL) { sdbFreeRaw(pRaw); + mndFreeClusterObj(&clusterObj); mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr()); return -1; } @@ -255,6 +290,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); return -1; } (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); @@ -262,10 +298,12 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); return -1; } mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); return 0; } @@ -328,24 +366,33 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { void *pIter = NULL; SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); if (pCluster != NULL) { - memcpy(&clusterObj, pCluster, sizeof(SClusterObj)); + if(mndDupClusterObj(pCluster, &clusterObj) != 0){ + mndReleaseCluster(pMnode, pCluster, pIter); + mndFreeClusterObj(&clusterObj); + return -1; + } clusterObj.upTime += tsUptimeInterval; mndReleaseCluster(pMnode, pCluster, pIter); } if (clusterObj.id <= 0) { mError("can't get cluster info while update uptime"); + mndFreeClusterObj(&clusterObj); return 0; } mInfo("update cluster uptime to %d", clusterObj.upTime); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime"); - if (pTrans == NULL) return -1; + if (pTrans == NULL) { + mndFreeClusterObj(&clusterObj); + return -1; + } SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); return -1; } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); @@ -353,9 +400,102 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); return -1; } mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); + return 0; +} + +int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew) { + memcpy(pNew, pOld, sizeof(SClusterObj)); + pNew->updateTime = taosGetTimestampMs(); + pNew->pMachineIds = taosArrayInit(taosArrayGetSize(pOld->pMachineIds), sizeof(SMachineId)); + if (!pNew->pMachineIds) return -1; + taosArrayAddAll(pNew->pMachineIds, pOld->pMachineIds); + return 0; +} + +void mndFreeClusterObj(SClusterObj *pCluster) { + if (pCluster) { + pCluster->pMachineIds = taosArrayDestroy(pCluster->pMachineIds); + } +} + +int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds) { + SClusterObj clusterObj = {0}; + void *pIter = NULL; + SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); + if (pCluster != NULL) { + int32_t nDups = 0; + int32_t size = taosArrayGetSize(pCluster->pMachineIds); + for (int32_t n = 0; n < nIds; ++n) { + bool exist = false; + for (int32_t i = 0; i < size; ++i) { + SMachineId *pId = TARRAY_GET_ELEM(pCluster->pMachineIds, i); + if (0 == strncmp(pId->id, (pIds + nIds)->id, TSDB_MACHINE_ID_LEN + 1)) { + exist = true; + ++nDups; + break; + } + } + if (!exist) { + if (!clusterObj.pMachineIds) { + if(mndDupClusterObj(pCluster, &clusterObj) != 0){ + mndReleaseCluster(pMnode, pCluster, pIter); + mndFreeClusterObj(&clusterObj); + return -1; + } + } + if (!taosArrayPush(clusterObj.pMachineIds, pIds + n)) { + mndReleaseCluster(pMnode, pCluster, pIter); + mndFreeClusterObj(&clusterObj); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + } + + if (nDups == nIds) { + mndReleaseCluster(pMnode, pCluster, pIter); + mndFreeClusterObj(&clusterObj); + return 0; + } + + mndReleaseCluster(pMnode, pCluster, pIter); + } + + if (clusterObj.id <= 0) { + mError("can't get cluster info while process machine-id"); + mndFreeClusterObj(&clusterObj); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "machine-id"); + if (pTrans == NULL) { + mndFreeClusterObj(&clusterObj); + return -1; + } + + SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); + return -1; + } + + mndTransDrop(pTrans); + mndFreeClusterObj(&clusterObj); return 0; } diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index c4e1894263..cc4b583b83 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -129,6 +129,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); } void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {} void grantAdd(EGrantType grant, uint64_t value) {} void grantRestore(EGrantType grant, uint64_t value) {} +char *grantGetMachineId(){return NULL}; int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t grantAlterActiveCode(int32_t did, const char *old, const char *new, char *out, int8_t type) {