From 4adce7ed52f1fb49d2cec1f2d9e1ae9600809400 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 7 Dec 2023 14:11:41 +0800 Subject: [PATCH] feat: support uniq grant --- include/common/tmsgdef.h | 5 +- include/util/tdef.h | 2 +- source/common/src/systable.c | 5 +- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 28 +++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 3 + source/dnode/mnode/impl/inc/mndCluster.h | 4 - source/dnode/mnode/impl/inc/mndDef.h | 3 +- source/dnode/mnode/impl/src/mndCluster.c | 200 ++------------------ source/dnode/mnode/impl/src/mndDnode.c | 63 ++++-- 10 files changed, 109 insertions(+), 205 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 61b471912f..cad9bb0a51 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -166,7 +166,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) - TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_MACHINE, "get-machine", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) @@ -335,8 +335,9 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT, "sync-prep-snapshot", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_UNUSED_CODE, "sync-unused", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_CLOSE_MSG_TYPE(TDMT_END_SYNC_MSG) diff --git a/include/util/tdef.h b/include/util/tdef.h index 4b879db385..7d5bc2bd7c 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -287,7 +287,7 @@ typedef enum ELogicConditionType { #define TSDB_ACTIVE_KEY_LEN 109 #define TSDB_CONN_ACTIVE_KEY_LEN 255 -#define TSDB_UNIQ_ACTIVE_KEY_LEN 255 +#define TSDB_UNIQ_ACTIVE_KEY_LEN 256 #define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 7889f2295c..2828a1e305 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -75,7 +75,10 @@ static const SSysDbTableSchema clusterSchema[] = { {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, - {.name = "machine_ids", .bytes = 100 * (TSDB_MACHINE_ID_LEN + 3) + 2 + VARSTR_HEADER_SIZE, .type= TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, +#ifdef TD_ENTERPRISE + {.name = "granted_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, + {.name = "active_code", .bytes = TSDB_UNIQ_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, +#endif }; static const SSysDbTableSchema userDBSchema[] = { diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e43c2af47..30067f0e01 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -54,6 +54,7 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg); int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg); +int32_t dmProcessGetMachine(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); // dmWorker.c int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ac94390619..1b3de80610 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -409,6 +409,32 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } +int32_t dmProcessGetMachine(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t size = TSDB_MACHINE_ID_LEN; + terrno = 0; + void *pRsp = rpcMallocCont(size); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to retrieve data since %s", terrstr()); + return -1; + } + char *machineId = grantGetMachineId(); + if (machineId && (strlen(machineId) == TSDB_MACHINE_ID_LEN)) { + memcpy(pRsp, machineId, TSDB_MACHINE_ID_LEN); + } else { + terrno = TSDB_CODE_INVALID_DATA_FMT; + rpcFreeCont(pRsp); + pRsp = NULL; + size = 0; + } + + pMsg->code = terrno; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = size; + + return TSDB_CODE_SUCCESS; +} + SArray *dmGetMsgHandles() { int32_t code = -1; SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle)); @@ -425,11 +451,13 @@ SArray *dmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + // Requests handled by MNODE if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_MACHINE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index d6bdaf51bc..b3971330b9 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -303,6 +303,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_MND_GRANT_NOTIFY: code = dmProcessGrantNotify(NULL, pMsg); break; + case TDMT_MND_GET_MACHINE: + code = dmProcessGetMachine(pMgmt, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed in mgmt queue", pMsg); diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index 19f18d9a9d..66c00bae11 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -30,10 +30,6 @@ int64_t mndGetClusterCreateTime(SMnode *pMnode); int64_t mndGetClusterUpTime(SMnode *pMnode); int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo); int32_t mndGetClusterActive(SMnode *pMnode, char* active); -int32_t mndGetClusterMachineIds(SMnode *pMnode, SArray *pIds); -int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds); -int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew); -void mndFreeClusterObj(SClusterObj *pCluster); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 17ca969e4a..2bee27c164 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -197,8 +197,7 @@ typedef struct { int64_t updateTime; int32_t upTime; int64_t grantedTime; - SArray* pMachineIds; - char active[TSDB_UNIQ_ACTIVE_KEY_LEN + 1]; + char active[TSDB_UNIQ_ACTIVE_KEY_LEN]; } SClusterObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index f93eb9a95b..c408defc0d 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -116,7 +116,7 @@ int32_t mndGetClusterActive(SMnode *pMnode, char *active) { void *pIter = NULL; SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); if (pCluster) { - if (active) strncpy(active, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN + 1); + if (active) strncpy(active, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN); mndReleaseCluster(pMnode, pCluster, pIter); return 0; } @@ -124,22 +124,6 @@ int32_t mndGetClusterActive(SMnode *pMnode, char *active) { return -1; } -int32_t mndGetClusterMachineIds(SMnode *pMnode, SArray *pIds) { - int32_t code = -1; - void *pIter = NULL; - SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); - if (pCluster) { - if (!pIds) pIds = taosArrayInit(taosArrayGetSize(pCluster->pMachineIds), sizeof(SMachineId)); - if (pIds) { - taosArrayAddAll(pIds, pCluster->pMachineIds); - code = 0; - } - mndReleaseCluster(pMnode, pCluster, pIter); - } - - return code; -} - int64_t mndGetClusterCreateTime(SMnode *pMnode) { int64_t createTime = 0; void *pIter = NULL; @@ -177,9 +161,7 @@ int64_t mndGetClusterUpTime(SMnode *pMnode) { static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int16_t nMachineIds = taosArrayGetSize(pCluster->pMachineIds); - int32_t machineSize = sizeof(int16_t) + nMachineIds * sizeof(SMachineId); - SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + machineSize + CLUSTER_RESERVE_SIZE); + SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE); if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; @@ -188,10 +170,8 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER) - SDB_SET_INT16(pRaw, dataPos, nMachineIds, _OVER) - for (int16_t i = 0; i < nMachineIds; ++i) { - SDB_SET_BINARY(pRaw, dataPos, ((SMachineId*)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id, TSDB_MACHINE_ID_LEN, _OVER) - } + SDB_SET_INT64(pRaw, dataPos, pCluster->grantedTime, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN, _OVER) SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER); @@ -235,17 +215,8 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER) if (sver > 1) { - int16_t nMachineIds = 0; - SDB_GET_INT16(pRaw, dataPos, &nMachineIds, _OVER) - if (nMachineIds > 0) { - pCluster->pMachineIds = taosArrayInit(nMachineIds, sizeof(SMachineId)); - if (!pCluster->pMachineIds) goto _OVER; - for (int16_t i = 0; i < nMachineIds; ++i) { - SDB_GET_BINARY(pRaw, dataPos, ((SMachineId *)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id, - TSDB_MACHINE_ID_LEN, _OVER) - ++TARRAY_SIZE(pCluster->pMachineIds); - } - } + SDB_GET_INT64(pRaw, dataPos, &pCluster->grantedTime, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN, _OVER) } SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) @@ -255,7 +226,6 @@ _OVER: if (terrno != 0) { mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster == NULL ? 0 : pCluster->id, pRaw, terrstr()); - mndFreeClusterObj(pCluster); taosMemoryFreeClear(pRow); return NULL; } @@ -273,7 +243,6 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { mTrace("cluster:%" PRId64 ", perform delete action, row:%p", pCluster->id, pCluster); - mndFreeClusterObj(pCluster); return 0; } @@ -299,23 +268,10 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id); pMnode->clusterId = clusterObj.id; - clusterObj.pMachineIds = taosArrayInit(1, sizeof(SMachineId)); - if(!clusterObj.pMachineIds) { - return -1; - } - char *machineId = grantGetMachineId(); - if (machineId) { - taosArrayPush(clusterObj.pMachineIds, machineId); - taosMemoryFree(machineId); - } - mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); - if (pRaw == NULL) { - mndFreeClusterObj(&clusterObj); - return -1; - } + if (pRaw == NULL) return -1; (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); mInfo("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw); @@ -323,7 +279,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-cluster"); if (pTrans == NULL) { sdbFreeRaw(pRaw); - mndFreeClusterObj(&clusterObj); mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr()); return -1; } @@ -332,7 +287,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); return -1; } (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); @@ -340,12 +294,10 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); return -1; } mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); return 0; } @@ -389,31 +341,15 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock * colDataSetVal(pColInfo, numOfRows, (const char *)&tsExpireTime, false); } +#ifdef TD_ENTERPRISE pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - int32_t nMachidIds = taosArrayGetSize(pCluster->pMachineIds); - char *pBuf = - taosMemoryCalloc(1, (nMachidIds > 0 ? nMachidIds * (TSDB_MACHINE_ID_LEN + 3) : 1) + 2 + VARSTR_HEADER_SIZE); - VarDataLenT nPos = 0; - if (pBuf) { - nPos += VARSTR_HEADER_SIZE; - snprintf(pBuf + nPos, 2, "["); - ++nPos; - for (int32_t i = 0; i < nMachidIds; ++i) { - snprintf(pBuf + nPos, TSDB_MACHINE_ID_LEN + 2, "\"%s", - ((SMachineId *)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id); - nPos += TSDB_MACHINE_ID_LEN + 1; - snprintf(pBuf + nPos, 3, "\","); - nPos += 2; - } - if (nMachidIds > 0) --nPos; - snprintf(pBuf + nPos, 2, "]"); - ++nPos; - *(VarDataLenT *)(pBuf) = nPos - VARSTR_HEADER_SIZE; - colDataSetVal(pColInfo, numOfRows, pBuf, false); - taosMemoryFree(pBuf); - } else { - colDataSetNULL(pColInfo, numOfRows); - } + colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->grantedTime, false); + + char active[TSDB_UNIQ_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(active, pCluster->active, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&active, false); +#endif sdbRelease(pSdb, pCluster); numOfRows++; @@ -434,33 +370,24 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { void *pIter = NULL; SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); if (pCluster != NULL) { - if(mndDupClusterObj(pCluster, &clusterObj) != 0){ - mndReleaseCluster(pMnode, pCluster, pIter); - mndFreeClusterObj(&clusterObj); - return -1; - } + memcpy(&clusterObj, pCluster, sizeof(SClusterObj)); clusterObj.upTime += tsUptimeInterval; mndReleaseCluster(pMnode, pCluster, pIter); } if (clusterObj.id <= 0) { mError("can't get cluster info while update uptime"); - mndFreeClusterObj(&clusterObj); return 0; } mInfo("update cluster uptime to %d", clusterObj.upTime); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime"); - if (pTrans == NULL) { - mndFreeClusterObj(&clusterObj); - return -1; - } + if (pTrans == NULL) return -1; SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); return -1; } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); @@ -468,102 +395,9 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); return -1; } mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); - return 0; -} - -int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew) { - memcpy(pNew, pOld, sizeof(SClusterObj)); - pNew->updateTime = taosGetTimestampMs(); - pNew->pMachineIds = taosArrayInit(taosArrayGetSize(pOld->pMachineIds), sizeof(SMachineId)); - if (!pNew->pMachineIds) return -1; - taosArrayAddAll(pNew->pMachineIds, pOld->pMachineIds); - return 0; -} - -void mndFreeClusterObj(SClusterObj *pCluster) { - if (pCluster) { - pCluster->pMachineIds = taosArrayDestroy(pCluster->pMachineIds); - } -} - -int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds) { - SClusterObj clusterObj = {0}; - void *pIter = NULL; - SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); - if (pCluster != NULL) { - int32_t nDups = 0; - int32_t size = taosArrayGetSize(pCluster->pMachineIds); - for (int32_t n = 0; n < nIds; ++n) { - bool exist = false; - for (int32_t i = 0; i < size; ++i) { - SMachineId *pId = TARRAY_GET_ELEM(pCluster->pMachineIds, i); - if (0 == strncmp(pId->id, (pIds + nIds)->id, TSDB_MACHINE_ID_LEN + 1)) { - exist = true; - ++nDups; - break; - } - } - if (!exist) { - if (!clusterObj.pMachineIds) { - if(mndDupClusterObj(pCluster, &clusterObj) != 0){ - mndReleaseCluster(pMnode, pCluster, pIter); - mndFreeClusterObj(&clusterObj); - return -1; - } - } - if (!taosArrayPush(clusterObj.pMachineIds, pIds + n)) { - mndReleaseCluster(pMnode, pCluster, pIter); - mndFreeClusterObj(&clusterObj); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } - } - - if (nDups == nIds) { - mndReleaseCluster(pMnode, pCluster, pIter); - mndFreeClusterObj(&clusterObj); - return 0; - } - - mndReleaseCluster(pMnode, pCluster, pIter); - } - - if (clusterObj.id <= 0) { - mError("can't get cluster info while process machine-id"); - mndFreeClusterObj(&clusterObj); - return -1; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "machine-id"); - if (pTrans == NULL) { - mndFreeClusterObj(&clusterObj); - return -1; - } - - SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); - return -1; - } - - mndTransDrop(pTrans); - mndFreeClusterObj(&clusterObj); return 0; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index f1ed05183b..e51dbde69b 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -58,6 +58,11 @@ enum { DND_DROP, }; +typedef struct { + char machineId[TSDB_MACHINE_ID_LEN + 1]; + tsem_t sem; +} SMachineInfo; + static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); @@ -71,6 +76,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp); +static int32_t mndProcessGetMachineRsp(SRpcMsg *pRsp); static int32_t mndProcessStatusReq(SRpcMsg *pReq); static int32_t mndProcessNotifyReq(SRpcMsg *pReq); static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq); @@ -104,6 +110,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); + mndSetMsgHandle(pMnode, TDMT_MND_GET_MACHINE_RSP, mndProcessGetMachineRsp); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq); mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq); mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq); @@ -734,10 +741,24 @@ _OVER: return code; } +static int32_t mndSendGetMachineToDnode(SMnode *pMnode, SDnodeObj *pObj, SMachineInfo *pInfo) { + SRpcMsg rpcMsg = {.pCont = NULL, .contLen = 0, .msgType = TDMT_MND_GET_MACHINE, .info.ahandle = pInfo}; + + mDebug("send get machine msg to dnode:%d %s", pObj->id, pObj->ep); + + SEpSet epSet = {.numOfEps = 1}; + strncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN); + epSet.eps[0].port = pObj->port; + tmsgSendReq(&epSet, &rpcMsg); + + return TSDB_CODE_SUCCESS; +} + static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) { - int32_t code = -1; - SSdbRaw *pRaw = NULL; - STrans *pTrans = NULL; + int32_t code = -1; + SSdbRaw *pRaw = NULL; + STrans *pTrans = NULL; + SMachineInfo *pInfo = NULL; SDnodeObj dnodeObj = {0}; dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE); @@ -746,11 +767,14 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC dnodeObj.port = pCreate->port; tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port); - char *machineId = grantGetMachineId(); - if (machineId) { - memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN); + if (!(pInfo = taosMemoryCalloc(1, sizeof(*pInfo)))) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; } - + tsem_init(&pInfo->sem, 0, 0); + mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo); + tsem_wait(&pInfo->sem); + memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode"); if (pTrans == NULL) goto _OVER; mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep); @@ -768,6 +792,10 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC _OVER: mndTransDrop(pTrans); sdbFreeRaw(pRaw); + if(pInfo) { + tsem_destroy(&pInfo->sem); + taosMemoryFree(pInfo); + } return code; } @@ -1239,6 +1267,21 @@ static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { return 0; } +static int32_t mndProcessGetMachineRsp(SRpcMsg *pRsp) { + if (pRsp->code != 0 || pRsp->contLen != TSDB_MACHINE_ID_LEN || pRsp->pCont == NULL) { + mError("failed to get machine since %s", tstrerror(pRsp->code ? pRsp->code : TSDB_CODE_INVALID_MSG)); + return -1; + } + + SMachineInfo *pInfo = pRsp->info.ahandle; + if (pInfo) { + memcpy(pInfo->machineId, pRsp->pCont, TSDB_MACHINE_ID_LEN); + tsem_post(&pInfo->sem); + } + + return 0; +} + static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; int32_t totalRows = 0; @@ -1349,11 +1392,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB taosMemoryFreeClear(b); #ifdef TD_ENTERPRISE - STR_TO_VARSTR(buf, pDnode->active); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, buf, false); - - STR_TO_VARSTR(buf, pDnode->connActive); + STR_TO_VARSTR(buf, pDnode->machineId); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, buf, false); #endif