feat: support machine id

This commit is contained in:
kailixu 2023-12-01 18:15:47 +08:00
parent 386af392ea
commit 042291c35b
6 changed files with 158 additions and 7 deletions

View File

@ -52,6 +52,7 @@ typedef enum {
int32_t grantCheck(EGrantType grant);
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
char* grantGetMachineId();
#ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE

View File

@ -264,6 +264,7 @@ typedef enum ELogicConditionType {
#define TSDB_JOB_STATUS_LEN 32
#define TSDB_CLUSTER_ID_LEN 40
#define TSDB_MACHINE_ID_LEN 24
#define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
#define TSDB_IPv4ADDR_LEN 16

View File

@ -28,6 +28,9 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
int64_t mndGetClusterUpTime(SMnode *pMnode);
int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds);
int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew);
void mndFreeClusterObj(SClusterObj *pCluster);
#ifdef __cplusplus
}

View File

@ -186,12 +186,17 @@ typedef struct {
TdThreadMutex mutex;
} STrans;
typedef struct {
char id[TSDB_MACHINE_ID_LEN + 1];
} SMachineId;
typedef struct {
int64_t id;
char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime;
int64_t updateTime;
int32_t upTime;
SArray* pMachineIds;
} SClusterObj;
typedef struct {

View File

@ -18,7 +18,7 @@
#include "mndShow.h"
#include "mndTrans.h"
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_VER_NUMBE 2
#define CLUSTER_RESERVE_SIZE 60
int64_t tsExpireTime = 0;
@ -137,7 +137,9 @@ int64_t mndGetClusterUpTime(SMnode *pMnode) {
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
int32_t nMachineIds = taosArrayGetSize(pCluster->pMachineIds);
int32_t machineSize = sizeof(int16_t) + nMachineIds * sizeof(SMachineId);
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + machineSize + CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
@ -146,7 +148,12 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
SDB_SET_INT16(pRaw, dataPos, nMachineIds, _OVER)
for (int32_t i = 0; i < nMachineIds; ++i) {
SDB_SET_BINARY(pRaw, dataPos, ((SMachineId*)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id, TSDB_MACHINE_ID_LEN, _OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
terrno = 0;
@ -164,12 +171,12 @@ _OVER:
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SClusterObj *pCluster = NULL;
SSdbRow *pRow = NULL;
SSdbRow *pRow = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != CLUSTER_VER_NUMBE) {
if (sver < 0 || sver > CLUSTER_VER_NUMBE) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
@ -186,6 +193,19 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
if (sver > 1) {
int16_t nMachineIds = 0;
SDB_GET_INT16(pRaw, dataPos, &nMachineIds, _OVER)
if (nMachineIds > 0) {
pCluster->pMachineIds = taosArrayInit(nMachineIds, sizeof(SMachineId));
if (!pCluster->pMachineIds) goto _OVER;
for (int16_t i = 0; i < nMachineIds; ++i) {
SDB_GET_BINARY(pRaw, dataPos, ((SMachineId *)TARRAY_GET_ELEM(pCluster->pMachineIds, i))->id,
TSDB_MACHINE_ID_LEN, _OVER)
}
}
}
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
@ -194,6 +214,7 @@ _OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster == NULL ? 0 : pCluster->id, pRaw,
terrstr());
mndFreeClusterObj(pCluster);
taosMemoryFreeClear(pRow);
return NULL;
}
@ -236,10 +257,23 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
pMnode->clusterId = clusterObj.id;
clusterObj.pMachineIds = taosArrayInit(1, sizeof(SMachineId));
if(!clusterObj.pMachineIds) {
return -1;
}
char *machineId = grantGetMachineId();
if (machineId) {
taosArrayPush(clusterObj.pMachineIds, machineId);
taosMemoryFree(machineId);
}
mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1;
if (pRaw == NULL) {
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mInfo("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
@ -247,6 +281,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-cluster");
if (pTrans == NULL) {
sdbFreeRaw(pRaw);
mndFreeClusterObj(&clusterObj);
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
return -1;
}
@ -255,6 +290,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
@ -262,10 +298,12 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return 0;
}
@ -328,24 +366,33 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
if(mndDupClusterObj(pCluster, &clusterObj) != 0){
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
return -1;
}
clusterObj.upTime += tsUptimeInterval;
mndReleaseCluster(pMnode, pCluster, pIter);
}
if (clusterObj.id <= 0) {
mError("can't get cluster info while update uptime");
mndFreeClusterObj(&clusterObj);
return 0;
}
mInfo("update cluster uptime to %d", clusterObj.upTime);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime");
if (pTrans == NULL) return -1;
if (pTrans == NULL) {
mndFreeClusterObj(&clusterObj);
return -1;
}
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
@ -353,9 +400,102 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return 0;
}
int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew) {
memcpy(pNew, pOld, sizeof(SClusterObj));
pNew->updateTime = taosGetTimestampMs();
pNew->pMachineIds = taosArrayInit(taosArrayGetSize(pOld->pMachineIds), sizeof(SMachineId));
if (!pNew->pMachineIds) return -1;
taosArrayAddAll(pNew->pMachineIds, pOld->pMachineIds);
return 0;
}
void mndFreeClusterObj(SClusterObj *pCluster) {
if (pCluster) {
pCluster->pMachineIds = taosArrayDestroy(pCluster->pMachineIds);
}
}
int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds) {
SClusterObj clusterObj = {0};
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
int32_t nDups = 0;
int32_t size = taosArrayGetSize(pCluster->pMachineIds);
for (int32_t n = 0; n < nIds; ++n) {
bool exist = false;
for (int32_t i = 0; i < size; ++i) {
SMachineId *pId = TARRAY_GET_ELEM(pCluster->pMachineIds, i);
if (0 == strncmp(pId->id, (pIds + nIds)->id, TSDB_MACHINE_ID_LEN + 1)) {
exist = true;
++nDups;
break;
}
}
if (!exist) {
if (!clusterObj.pMachineIds) {
if(mndDupClusterObj(pCluster, &clusterObj) != 0){
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
return -1;
}
}
if (!taosArrayPush(clusterObj.pMachineIds, pIds + n)) {
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
if (nDups == nIds) {
mndReleaseCluster(pMnode, pCluster, pIter);
mndFreeClusterObj(&clusterObj);
return 0;
}
mndReleaseCluster(pMnode, pCluster, pIter);
}
if (clusterObj.id <= 0) {
mError("can't get cluster info while process machine-id");
mndFreeClusterObj(&clusterObj);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "machine-id");
if (pTrans == NULL) {
mndFreeClusterObj(&clusterObj);
return -1;
}
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return -1;
}
mndTransDrop(pTrans);
mndFreeClusterObj(&clusterObj);
return 0;
}

View File

@ -129,6 +129,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); }
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
char *grantGetMachineId(){return NULL};
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t grantAlterActiveCode(int32_t did, const char *old, const char *new, char *out, int8_t type) {