feat: support uniq grant

This commit is contained in:
kailixu 2024-01-24 19:08:19 +08:00
parent e13e61f4d5
commit 4d345b819a
13 changed files with 23 additions and 131 deletions

View File

@ -57,9 +57,8 @@ typedef enum {
TSDB_GRANT_BACKUP_RESTORE_EXPIRE,
} EGrantType;
int32_t grantCheck(EGrantType grant);
char* grantGetMachineId();
char* tGetMachineId();
#ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE

View File

@ -1564,9 +1564,11 @@ typedef struct {
int64_t updateTime;
float numOfCores;
int32_t numOfSupportVnodes;
int32_t numOfDiskCfg;
int64_t memTotal;
int64_t memAvail;
char dnodeEp[TSDB_EP_LEN];
char machineId[TSDB_MACHINE_ID_LEN + 1];
SMnodeLoad mload;
SQnodeLoad qload;
SClusterCfg clusterCfg;

View File

@ -164,7 +164,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_MACHINE, "get-machine", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)

View File

@ -1163,9 +1163,11 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->updateTime) < 0) return -1;
if (tEncodeFloat(&encoder, pReq->numOfCores) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfSupportVnodes) < 0) return -1;
if (tEncodeI32v(&encoder, pReq->numOfDiskCfg) < 0) return -1;
if (tEncodeI64(&encoder, pReq->memTotal) < 0) return -1;
if (tEncodeI64(&encoder, pReq->memAvail) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dnodeEp) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->machineId) < 0) return -1;
// cluster cfg
if (tEncodeI32(&encoder, pReq->clusterCfg.statusInterval) < 0) return -1;
@ -1253,9 +1255,11 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->updateTime) < 0) return -1;
if (tDecodeFloat(&decoder, &pReq->numOfCores) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfSupportVnodes) < 0) return -1;
if (tDecodeI32v(&decoder, &pReq->numOfDiskCfg) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->memTotal) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->memAvail) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dnodeEp) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->machineId) < 0) return -1;
// cluster cfg
if (tDecodeI32(&decoder, &pReq->clusterCfg.statusInterval) < 0) return -1;

View File

@ -56,7 +56,6 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg);
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg);
int32_t dmProcessGetMachine(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -114,9 +114,15 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.updateTime = pMgmt->pData->updateTime;
req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = tsNumOfSupportVnodes;
req.numOfDiskCfg = tsDiskCfgNum;
req.memTotal = tsTotalMemoryKB * 1024;
req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024;
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
char *machine = tGetMachineId();
if (machine) {
tstrncpy(req.machineId, machine, TSDB_MACHINE_ID_LEN + 1);
taosMemoryFreeClear(machine);
}
req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0;
@ -414,33 +420,6 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessGetMachine(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t size = TSDB_MACHINE_ID_LEN;
terrno = 0;
void *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to retrieve data since %s", terrstr());
return -1;
}
char *machineId = grantGetMachineId();
if (machineId) {
memcpy(pRsp, machineId, TSDB_MACHINE_ID_LEN);
taosMemoryFreeClear(machineId);
} else {
terrno = TSDB_CODE_INVALID_DATA_FMT;
rpcFreeCont(pRsp);
pRsp = NULL;
size = 0;
}
pMsg->code = terrno;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = size;
return TSDB_CODE_SUCCESS;
}
SArray *dmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
@ -463,7 +442,6 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_MACHINE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
code = 0;

View File

@ -348,9 +348,6 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_MND_GRANT_NOTIFY:
code = dmProcessGrantNotify(NULL, pMsg);
break;
case TDMT_MND_GET_MACHINE:
code = dmProcessGetMachine(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in mgmt queue", pMsg);

View File

@ -163,8 +163,6 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_MACHINE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -210,6 +210,7 @@ typedef struct {
int32_t numOfVnodes;
int32_t numOfOtherNodes;
int32_t numOfSupportVnodes;
int32_t numOfDiskCfg;
float numOfCores;
int64_t memTotal;
int64_t memAvail;

View File

@ -107,7 +107,6 @@ typedef struct {
typedef struct SMnode {
int32_t selfDnodeId;
int32_t refMgmt;
int64_t clusterId;
TdThread thread;
TdThreadRwlock lock;

View File

@ -58,13 +58,6 @@ enum {
DND_DROP,
};
typedef struct {
SMnodeRefInfo refInfo;
int64_t refId;
tsem_t sem;
char machineId[TSDB_MACHINE_ID_LEN + 1];
} SMachineInfo;
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -78,7 +71,6 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessGetMachineRsp(SRpcMsg *pRsp);
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
@ -112,7 +104,6 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_MACHINE_RSP, mndProcessGetMachineRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
@ -145,7 +136,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
tstrncpy(dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
dnodeObj.fqdn[TSDB_FQDN_LEN - 1] = 0;
snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", tsLocalFqdn, tsServerPort);
char *machineId = grantGetMachineId();
char *machineId = tGetMachineId();
if (machineId) {
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
taosMemoryFreeClear(machineId);
@ -677,8 +668,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->rebootTime = statusReq.rebootTime;
pDnode->numOfCores = statusReq.numOfCores;
pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
pDnode->numOfDiskCfg = statusReq.numOfDiskCfg;
pDnode->memAvail = statusReq.memAvail;
pDnode->memTotal = statusReq.memTotal;
if (pDnode->machineId[0] == 0 && statusReq.machineId[0] != 0) {
tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1);
}
SStatusRsp statusRsp = {0};
statusRsp.statusSeq++;
@ -748,25 +743,10 @@ _OVER:
return code;
}
static int32_t mndSendGetMachineToDnode(SMnode *pMnode, SDnodeObj *pObj, SMachineInfo *pInfo) {
SRpcMsg rpcMsg = {.pCont = NULL, .contLen = 0, .msgType = TDMT_MND_GET_MACHINE, .info.ahandle = (void*)pInfo->refId};
SEpSet epSet = {.numOfEps = 1};
strncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->port;
return tmsgSendReq(&epSet, &rpcMsg);
}
static void mndDestroyMachineInfo(void *pInfo) {
if (pInfo) {
tsem_destroy(&((SMachineInfo *)pInfo)->sem);
}
}
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
SMachineInfo *pInfo = NULL;
SDnodeObj dnodeObj = {0};
dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
@ -775,29 +755,6 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
dnodeObj.port = pCreate->port;
tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
#if 0
if (!(pInfo = taosMemoryCalloc(1, sizeof(*pInfo)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
pInfo->refInfo.freeFp = mndDestroyMachineInfo;
tsem_init(&pInfo->sem, 0, 0);
if((pInfo->refId = taosAddRef(pMnode->refMgmt, pInfo)) < 0) {
goto _OVER;
}
if ((terrno = mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo)) != 0) {
goto _OVER;
}
if (tsem_timewait(&pInfo->sem, 1000) < 0) {
terrno = TSDB_CODE_DNODE_OFFLINE;
goto _OVER;
}
if (strlen(pInfo->machineId) == TSDB_MACHINE_ID_LEN) {
memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN);
} else {
mWarn("Invalid machineId:%s to create dnode:%s", pInfo->machineId, dnodeObj.fqdn);
}
#endif
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
@ -815,9 +772,6 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
if (pInfo) {
taosRemoveRef(pMnode->refMgmt, pInfo->refId);
}
return code;
}
@ -1318,27 +1272,6 @@ static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
return 0;
}
static int32_t mndProcessGetMachineRsp(SRpcMsg *pRsp) {
if (pRsp->code != 0 || pRsp->contLen != TSDB_MACHINE_ID_LEN || pRsp->pCont == NULL) {
mError("failed to get machine since %s", tstrerror(pRsp->code ? pRsp->code : TSDB_CODE_INVALID_MSG));
return -1;
}
SMnode *pMnode = pRsp->info.node;
int64_t refId = (int64_t)pRsp->info.ahandle;
if (pMnode) {
SMachineInfo *pInfo = taosAcquireRef(pMnode->refMgmt, refId);
if (pInfo) {
memcpy(pInfo->machineId, pRsp->pCont, TSDB_MACHINE_ID_LEN);
tsem_post(&pInfo->sem);
taosReleaseRef(pMnode->refMgmt, refId);
}
}
return 0;
}
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
int32_t totalRows = 0;

View File

@ -75,7 +75,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); }
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
char *grantGetMachineId(){return NULL};
char *tGetMachineId(){return NULL};
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }

View File

@ -193,7 +193,7 @@ static void mndPullupGrant(SMnode *pMnode) {
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
@ -596,14 +596,6 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
memcpy(pMnode->syncMgmt.nodeRoles, pOption->nodeRoles, sizeof(pOption->nodeRoles));
}
static void mndDestroyRefInfo(void *pInfo) {
SMnodeRefInfo *pRefInfo = pInfo;
if (pRefInfo && pRefInfo->freeFp) {
(*pRefInfo->freeFp)(pInfo);
}
taosMemoryFree(pInfo);
}
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
mInfo("start to open mnode in %s", path);
@ -628,16 +620,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL;
}
int32_t code = 0;
if ((pMnode->refMgmt = taosOpenRef(200, mndDestroyRefInfo)) < 0) {
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndCreateDir(pMnode, path);
int32_t code = mndCreateDir(pMnode, path);
if (code != 0) {
code = terrno;
mError("failed to open mnode since %s", terrstr());
@ -680,7 +663,6 @@ void mndClose(SMnode *pMnode) {
if (pMnode != NULL) {
mInfo("start to close mnode");
mndCleanupSteps(pMnode, -1);
taosCloseRef(pMnode->refMgmt);
taosMemoryFreeClear(pMnode->path);
taosMemoryFreeClear(pMnode);
mInfo("mnode is closed");