feat: support uniq grant

This commit is contained in:
kailixu 2023-12-07 14:11:41 +08:00
parent 780f3d9cad
commit 4adce7ed52
10 changed files with 109 additions and 205 deletions

View File

@ -166,7 +166,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_MACHINE, "get-machine", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
@ -335,8 +335,9 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT, "sync-prep-snapshot", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_UNUSED_CODE, "sync-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_CLOSE_MSG_TYPE(TDMT_END_SYNC_MSG)

View File

@ -287,7 +287,7 @@ typedef enum ELogicConditionType {
#define TSDB_ACTIVE_KEY_LEN 109
#define TSDB_CONN_ACTIVE_KEY_LEN 255
#define TSDB_UNIQ_ACTIVE_KEY_LEN 255
#define TSDB_UNIQ_ACTIVE_KEY_LEN 256
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE

View File

@ -75,7 +75,10 @@ static const SSysDbTableSchema clusterSchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "machine_ids", .bytes = 100 * (TSDB_MACHINE_ID_LEN + 3) + 2 + VARSTR_HEADER_SIZE, .type= TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#ifdef TD_ENTERPRISE
{.name = "granted_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "active_code", .bytes = TSDB_UNIQ_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#endif
};
static const SSysDbTableSchema userDBSchema[] = {

View File

@ -54,6 +54,7 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg);
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg);
int32_t dmProcessGetMachine(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -409,6 +409,32 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessGetMachine(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t size = TSDB_MACHINE_ID_LEN;
terrno = 0;
void *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to retrieve data since %s", terrstr());
return -1;
}
char *machineId = grantGetMachineId();
if (machineId && (strlen(machineId) == TSDB_MACHINE_ID_LEN)) {
memcpy(pRsp, machineId, TSDB_MACHINE_ID_LEN);
} else {
terrno = TSDB_CODE_INVALID_DATA_FMT;
rpcFreeCont(pRsp);
pRsp = NULL;
size = 0;
}
pMsg->code = terrno;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = size;
return TSDB_CODE_SUCCESS;
}
SArray *dmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
@ -425,11 +451,13 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_MACHINE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
code = 0;

View File

@ -303,6 +303,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_MND_GRANT_NOTIFY:
code = dmProcessGrantNotify(NULL, pMsg);
break;
case TDMT_MND_GET_MACHINE:
code = dmProcessGetMachine(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in mgmt queue", pMsg);

View File

@ -30,10 +30,6 @@ int64_t mndGetClusterCreateTime(SMnode *pMnode);
int64_t mndGetClusterUpTime(SMnode *pMnode);
int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo);
int32_t mndGetClusterActive(SMnode *pMnode, char* active);
int32_t mndGetClusterMachineIds(SMnode *pMnode, SArray *pIds);
int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds);
int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew);
void mndFreeClusterObj(SClusterObj *pCluster);
#ifdef __cplusplus
}

View File

@ -197,8 +197,7 @@ typedef struct {
int64_t updateTime;
int32_t upTime;
int64_t grantedTime;
SArray* pMachineIds;
char active[TSDB_UNIQ_ACTIVE_KEY_LEN + 1];
char active[TSDB_UNIQ_ACTIVE_KEY_LEN];
} SClusterObj;
typedef struct {

View File

@ -116,7 +116,7 @@ int32_t mndGetClusterActive(SMnode *pMnode, char *active) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster) {
if (active) strncpy(active, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN + 1);
if (active) strncpy(active, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN);
mndReleaseCluster(pMnode, pCluster, pIter);
return 0;
}
@ -124,22 +124,6 @@ int32_t mndGetClusterActive(SMnode *pMnode, char *active) {
return -1;
}
int32_t mndGetClusterMachineIds(SMnode *pMnode, SArray *pIds) {
int32_t code = -1;
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster) {
if (!pIds) pIds = taosArrayInit(taosArrayGetSize(pCluster->pMachineIds), sizeof(SMachineId));
if (pIds) {
taosArrayAddAll(pIds, pCluster->pMachineIds);
code = 0;
}
mndReleaseCluster(pMnode, pCluster, pIter);
}
return code;
}
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
int64_t createTime = 0;
void *pIter = NULL;
@ -177,9 +161,7 @@ int64_t mndGetClusterUpTime(SMnode *pMnode) {
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int16_t nMachineIds = taosArrayGetSize(pCluster->pMachineIds);
int32_t machineSize = sizeof(int16_t) + nMachineIds * sizeof(SMachineId);
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + machineSize + CLUSTER_RESERVE_SIZE);
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
@ -188,10 +170,8 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
SDB_SET_INT16(pRaw, dataPos, nMachineIds, _OVER)
for (int16_t i = 0; i < nMachineIds; ++i) {
SDB_SET_BINARY(pRaw, dataPos, ((SMachineId*)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id, TSDB_MACHINE_ID_LEN, _OVER)
}
SDB_SET_INT64(pRaw, dataPos, pCluster->grantedTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
@ -235,17 +215,8 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
if (sver > 1) {
int16_t nMachineIds = 0;
SDB_GET_INT16(pRaw, dataPos, &nMachineIds, _OVER)
if (nMachineIds > 0) {
pCluster->pMachineIds = taosArrayInit(nMachineIds, sizeof(SMachineId));
if (!pCluster->pMachineIds) goto _OVER;
for (int16_t i = 0; i < nMachineIds; ++i) {
SDB_GET_BINARY(pRaw, dataPos, ((SMachineId *)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id,
TSDB_MACHINE_ID_LEN, _OVER)
++TARRAY_SIZE(pCluster->pMachineIds);
}
}
SDB_GET_INT64(pRaw, dataPos, &pCluster->grantedTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN, _OVER)
}
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
@ -255,7 +226,6 @@ _OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster == NULL ? 0 : pCluster->id, pRaw,
terrstr());
mndFreeClusterObj(pCluster);
taosMemoryFreeClear(pRow);
return NULL;
}
@ -273,7 +243,6 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%" PRId64 ", perform delete action, row:%p", pCluster->id, pCluster);
mndFreeClusterObj(pCluster);
return 0;
}
@ -299,23 +268,10 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
pMnode->clusterId = clusterObj.id;
clusterObj.pMachineIds = taosArrayInit(1, sizeof(SMachineId));
if(!clusterObj.pMachineIds) {
return -1;
}
char *machineId = grantGetMachineId();
if (machineId) {
taosArrayPush(clusterObj.pMachineIds, machineId);
taosMemoryFree(machineId);
}
mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) {
mndFreeClusterObj(&clusterObj);
return -1;
}
if (pRaw == NULL) return -1;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mInfo("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
@ -323,7 +279,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-cluster");
if (pTrans == NULL) {
sdbFreeRaw(pRaw);
mndFreeClusterObj(&clusterObj);
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
return -1;
}
@ -332,7 +287,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
@ -340,12 +294,10 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return 0;
}
@ -389,31 +341,15 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *
colDataSetVal(pColInfo, numOfRows, (const char *)&tsExpireTime, false);
}
#ifdef TD_ENTERPRISE
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int32_t nMachidIds = taosArrayGetSize(pCluster->pMachineIds);
char *pBuf =
taosMemoryCalloc(1, (nMachidIds > 0 ? nMachidIds * (TSDB_MACHINE_ID_LEN + 3) : 1) + 2 + VARSTR_HEADER_SIZE);
VarDataLenT nPos = 0;
if (pBuf) {
nPos += VARSTR_HEADER_SIZE;
snprintf(pBuf + nPos, 2, "[");
++nPos;
for (int32_t i = 0; i < nMachidIds; ++i) {
snprintf(pBuf + nPos, TSDB_MACHINE_ID_LEN + 2, "\"%s",
((SMachineId *)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id);
nPos += TSDB_MACHINE_ID_LEN + 1;
snprintf(pBuf + nPos, 3, "\",");
nPos += 2;
}
if (nMachidIds > 0) --nPos;
snprintf(pBuf + nPos, 2, "]");
++nPos;
*(VarDataLenT *)(pBuf) = nPos - VARSTR_HEADER_SIZE;
colDataSetVal(pColInfo, numOfRows, pBuf, false);
taosMemoryFree(pBuf);
} else {
colDataSetNULL(pColInfo, numOfRows);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->grantedTime, false);
char active[TSDB_UNIQ_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(active, pCluster->active, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&active, false);
#endif
sdbRelease(pSdb, pCluster);
numOfRows++;
@ -434,33 +370,24 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
if(mndDupClusterObj(pCluster, &clusterObj) != 0){
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
return -1;
}
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
clusterObj.upTime += tsUptimeInterval;
mndReleaseCluster(pMnode, pCluster, pIter);
}
if (clusterObj.id <= 0) {
mError("can't get cluster info while update uptime");
mndFreeClusterObj(&clusterObj);
return 0;
}
mInfo("update cluster uptime to %d", clusterObj.upTime);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime");
if (pTrans == NULL) {
mndFreeClusterObj(&clusterObj);
return -1;
}
if (pTrans == NULL) return -1;
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
@ -468,102 +395,9 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return 0;
}
int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew) {
memcpy(pNew, pOld, sizeof(SClusterObj));
pNew->updateTime = taosGetTimestampMs();
pNew->pMachineIds = taosArrayInit(taosArrayGetSize(pOld->pMachineIds), sizeof(SMachineId));
if (!pNew->pMachineIds) return -1;
taosArrayAddAll(pNew->pMachineIds, pOld->pMachineIds);
return 0;
}
void mndFreeClusterObj(SClusterObj *pCluster) {
if (pCluster) {
pCluster->pMachineIds = taosArrayDestroy(pCluster->pMachineIds);
}
}
int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds) {
SClusterObj clusterObj = {0};
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
int32_t nDups = 0;
int32_t size = taosArrayGetSize(pCluster->pMachineIds);
for (int32_t n = 0; n < nIds; ++n) {
bool exist = false;
for (int32_t i = 0; i < size; ++i) {
SMachineId *pId = TARRAY_GET_ELEM(pCluster->pMachineIds, i);
if (0 == strncmp(pId->id, (pIds + nIds)->id, TSDB_MACHINE_ID_LEN + 1)) {
exist = true;
++nDups;
break;
}
}
if (!exist) {
if (!clusterObj.pMachineIds) {
if(mndDupClusterObj(pCluster, &clusterObj) != 0){
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
return -1;
}
}
if (!taosArrayPush(clusterObj.pMachineIds, pIds + n)) {
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
if (nDups == nIds) {
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
return 0;
}
mndReleaseCluster(pMnode, pCluster, pIter);
}
if (clusterObj.id <= 0) {
mError("can't get cluster info while process machine-id");
mndFreeClusterObj(&clusterObj);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "machine-id");
if (pTrans == NULL) {
mndFreeClusterObj(&clusterObj);
return -1;
}
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return 0;
}

View File

@ -58,6 +58,11 @@ enum {
DND_DROP,
};
typedef struct {
char machineId[TSDB_MACHINE_ID_LEN + 1];
tsem_t sem;
} SMachineInfo;
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -71,6 +76,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessGetMachineRsp(SRpcMsg *pRsp);
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
@ -104,6 +110,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_MACHINE_RSP, mndProcessGetMachineRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
@ -734,10 +741,24 @@ _OVER:
return code;
}
static int32_t mndSendGetMachineToDnode(SMnode *pMnode, SDnodeObj *pObj, SMachineInfo *pInfo) {
SRpcMsg rpcMsg = {.pCont = NULL, .contLen = 0, .msgType = TDMT_MND_GET_MACHINE, .info.ahandle = pInfo};
mDebug("send get machine msg to dnode:%d %s", pObj->id, pObj->ep);
SEpSet epSet = {.numOfEps = 1};
strncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->port;
tmsgSendReq(&epSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
}
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
SMachineInfo *pInfo = NULL;
SDnodeObj dnodeObj = {0};
dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
@ -746,11 +767,14 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
dnodeObj.port = pCreate->port;
tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
char *machineId = grantGetMachineId();
if (machineId) {
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
if (!(pInfo = taosMemoryCalloc(1, sizeof(*pInfo)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tsem_init(&pInfo->sem, 0, 0);
mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo);
tsem_wait(&pInfo->sem);
memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
@ -768,6 +792,10 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
if(pInfo) {
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
}
return code;
}
@ -1239,6 +1267,21 @@ static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
return 0;
}
static int32_t mndProcessGetMachineRsp(SRpcMsg *pRsp) {
if (pRsp->code != 0 || pRsp->contLen != TSDB_MACHINE_ID_LEN || pRsp->pCont == NULL) {
mError("failed to get machine since %s", tstrerror(pRsp->code ? pRsp->code : TSDB_CODE_INVALID_MSG));
return -1;
}
SMachineInfo *pInfo = pRsp->info.ahandle;
if (pInfo) {
memcpy(pInfo->machineId, pRsp->pCont, TSDB_MACHINE_ID_LEN);
tsem_post(&pInfo->sem);
}
return 0;
}
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
int32_t totalRows = 0;
@ -1349,11 +1392,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
taosMemoryFreeClear(b);
#ifdef TD_ENTERPRISE
STR_TO_VARSTR(buf, pDnode->active);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
STR_TO_VARSTR(buf, pDnode->connActive);
STR_TO_VARSTR(buf, pDnode->machineId);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
#endif