feat: support uniq grant

This commit is contained in:
kailixu 2023-12-06 15:30:26 +08:00
parent e623c9c3c8
commit 67fd9993b0
9 changed files with 48 additions and 137 deletions

View File

@ -46,10 +46,17 @@ typedef enum {
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_STREAM,
TSDB_GRANT_CPU_CORES,
TSDB_GRANT_STABLE,
TSDB_GRANT_TABLE,
TSDB_GRANT_TOPIC,
TSDB_GRANT_STREAM_EXPIRE,
TSDB_GRANT_TOPIC_EXPIRE,
TSDB_GRANT_AUDIT_EXPIRE,
TSDB_GRANT_MULTI_TIER_EXPIRE,
TSDB_GRANT_BACKUP_RESTORE_EXPIRE,
TSDB_GRANT_REPLICATION_EXPIRE,
} EGrantType;
typedef struct {
@ -58,7 +65,6 @@ typedef struct {
} SGrantedInfo;
int32_t grantCheck(EGrantType grant);
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
char* grantGetMachineId();
#ifndef GRANTS_CFG

View File

@ -287,6 +287,7 @@ typedef enum ELogicConditionType {
#define TSDB_ACTIVE_KEY_LEN 109
#define TSDB_CONN_ACTIVE_KEY_LEN 255
#define TSDB_UNIQ_ACTIVE_KEY_LEN 255
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE

View File

@ -28,6 +28,8 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
int64_t mndGetClusterUpTime(SMnode *pMnode);
int32_t mndGetClusterActive(SMnode *pMnode, char* active);
int32_t mndGetClusterMachineIds(SMnode *pMnode, SArray *pIds);
int32_t mndProcessClusterMachineIds(SMnode *pMnode, SMachineId *pIds, int32_t nIds);
int32_t mndDupClusterObj(SClusterObj *pOld, SClusterObj *pNew);
void mndFreeClusterObj(SClusterObj *pCluster);

View File

@ -196,6 +196,7 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
int32_t upTime;
char active[TSDB_UNIQ_ACTIVE_KEY_LEN + 1];
SArray* pMachineIds;
} SClusterObj;

View File

@ -100,6 +100,34 @@ int64_t mndGetClusterId(SMnode *pMnode) {
return clusterId;
}
int32_t mndGetClusterActive(SMnode *pMnode, char *active) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster) {
if (active) strncpy(active, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN + 1);
mndReleaseCluster(pMnode, pCluster, pIter);
return 0;
}
return -1;
}
int32_t mndGetClusterMachineIds(SMnode *pMnode, SArray *pIds) {
int32_t code = -1;
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster) {
if (!pIds) pIds = taosArrayInit(taosArrayGetSize(pCluster->pMachineIds), sizeof(SMachineId));
if (pIds) {
taosArrayAddAll(pIds, pCluster->pMachineIds);
code = 0;
}
mndReleaseCluster(pMnode, pCluster, pIter);
}
return code;
}
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
int64_t createTime = 0;
void *pIter = NULL;

View File

@ -761,109 +761,6 @@ _OVER:
return code;
}
static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfgReq, int8_t action) {
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
SDnodeObj *pDnode = NULL;
SArray *failRecord = NULL;
bool cfgAll = pCfgReq->dnodeId == -1;
int32_t cfgAllErr = 0;
int32_t iter = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
if (cfgAll) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
++iter;
} else if (!(pDnode = mndAcquireDnode(pMnode, pCfgReq->dnodeId))) {
goto _OVER;
}
SDnodeObj tmpDnode = *pDnode;
if (action == DND_ACTIVE_CODE) {
if (grantAlterActiveCode(pDnode->id, pDnode->active, pCfgReq->value, tmpDnode.active, 0) != 0) {
if (TSDB_CODE_DUP_KEY != terrno) {
mError("dnode:%d, config dnode:%d, app:%p config:%s value:%s failed since %s", pDnode->id, pCfgReq->dnodeId,
pReq->info.ahandle, pCfgReq->config, pCfgReq->value, terrstr());
if (cfgAll) { // alter all dnodes:
if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t));
if (failRecord) taosArrayPush(failRecord, &pDnode->id);
if (0 == cfgAllErr) cfgAllErr = terrno; // output 1st terrno.
}
} else {
terrno = 0; // no action for dup active code
}
if (cfgAll) continue;
goto _OVER;
}
} else if (action == DND_CONN_ACTIVE_CODE) {
if (grantAlterActiveCode(pDnode->id, pDnode->connActive, pCfgReq->value, tmpDnode.connActive, 1) != 0) {
if (TSDB_CODE_DUP_KEY != terrno) {
mError("dnode:%d, config dnode:%d, app:%p config:%s value:%s failed since %s", pDnode->id, pCfgReq->dnodeId,
pReq->info.ahandle, pCfgReq->config, pCfgReq->value, terrstr());
if (cfgAll) {
if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t));
if (failRecord) taosArrayPush(failRecord, &pDnode->id);
if (0 == cfgAllErr) cfgAllErr = terrno;
}
} else {
terrno = 0;
}
if (cfgAll) continue;
goto _OVER;
}
} else {
terrno = TSDB_CODE_INVALID_CFG;
goto _OVER;
}
if (!pTrans) {
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "config-dnode");
if (!pTrans) goto _OVER;
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
}
pRaw = mndDnodeActionEncode(&tmpDnode);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
mInfo("dnode:%d, config dnode:%d, app:%p config:%s value:%s", pDnode->id, pCfgReq->dnodeId, pReq->info.ahandle,
pCfgReq->config, pCfgReq->value);
if (cfgAll) {
sdbRelease(pSdb, pDnode);
pDnode = NULL;
} else {
break;
}
}
if (pTrans && mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
tsGrantHBInterval = TMIN(TMAX(5, iter / 2), 30);
terrno = 0;
_OVER:
if (cfgAll) {
sdbRelease(pSdb, pDnode);
if (cfgAllErr != 0) terrno = cfgAllErr;
int32_t nFail = taosArrayGetSize(failRecord);
if (nFail > 0) {
mError("config dnode, cfg:%d, app:%p config:%s value:%s. total:%d, fail:%d", pCfgReq->dnodeId, pReq->info.ahandle,
pCfgReq->config, pCfgReq->value, iter, nFail);
}
} else {
mndReleaseDnode(pMnode, pDnode);
}
sdbCancelFetch(pSdb, pIter);
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
taosArrayDestroy(failRecord);
return terrno;
}
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -1283,35 +1180,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "supportvnodes");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, GRANT_ACTIVE_CODE, 10) == 0 ||
strncasecmp(cfgReq.config, GRANT_C_ACTIVE_CODE, 11) == 0) {
int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE;
int8_t index = opt == DND_ACTIVE_CODE ? 10 : 11;
if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) {
mError("dnode:%d, failed to config activeCode since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
terrno = TSDB_CODE_INVALID_CFG;
goto _err_out;
}
int32_t vlen = strlen(cfgReq.value);
if (vlen > 0 && ((opt == DND_ACTIVE_CODE && vlen != (TSDB_ACTIVE_KEY_LEN - 1)) ||
(opt == DND_CONN_ACTIVE_CODE &&
(vlen > (TSDB_CONN_ACTIVE_KEY_LEN - 1) || vlen < (TSDB_ACTIVE_KEY_LEN - 1))))) {
mError("dnode:%d, failed to config activeCode since invalid vlen:%d. conf:%s, val:%s", cfgReq.dnodeId, vlen,
cfgReq.config, cfgReq.value);
terrno = TSDB_CODE_INVALID_CFG;
goto _err_out;
}
strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? GRANT_ACTIVE_CODE : GRANT_C_ACTIVE_CODE);
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%s", cfgReq.value);
if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) {
mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr());
terrno = TSDB_CODE_INVALID_CFG;
goto _err_out;
}
tFreeSMCfgDnodeReq(&cfgReq);
return 0;
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
int32_t optLen = strlen("s3blocksize");
int32_t flag = -1;

View File

@ -132,9 +132,6 @@ void grantRestore(EGrantType grant, uint64_t value) {}
char *grantGetMachineId(){return NULL};
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t grantAlterActiveCode(int32_t did, const char *old, const char *new, char *out, int8_t type) {
return TSDB_CODE_SUCCESS;
}
#endif

View File

@ -806,6 +806,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
int32_t sqlLen = 0;
terrno = TSDB_CODE_SUCCESS;
if ((terrno = grantCheck(TSDB_GRANT_STREAM)) < 0) {
goto _OVER;
}
SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;

View File

@ -564,6 +564,10 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
return code;
}
if ((terrno = grantCheck(TSDB_GRANT_TOPIC)) < 0) {
goto _OVER;
}
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;