feat: support uniq grant

This commit is contained in:
kailixu 2024-01-22 13:20:15 +08:00
parent 5208829f23
commit dbd3c53182
9 changed files with 42 additions and 87 deletions

View File

@ -57,42 +57,39 @@ typedef enum {
TSDB_GRANT_BACKUP_RESTORE_EXPIRE,
} EGrantType;
typedef struct {
int64_t grantedTime;
} SGrantedInfo;
int32_t grantCheck(EGrantType grant);
char* grantGetMachineId();
#ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "topics", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "multi_tier_storage_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "topics_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "audit_log_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "csv_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "backup_restore_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "data_ins", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "opc_ua", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "pi", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "kafka", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "influxdb", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "mqtt", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "avevahistorian", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "opentsdb", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "tdengine2.6", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "tdengine3.0", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "subscriptions", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "multi_tier_storage_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "subscriptions_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "audit_log_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "csv_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "backup_restore_expire", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
// {.name = "opc_ua", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "pi", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "kafka", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "influxdb", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "mqtt", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "avevahistorian", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "opentsdb", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "tdengine2.6", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// {.name = "tdengine3.0", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
// }
#else
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \

View File

@ -413,6 +413,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MNODE_ONLY_TWO_MNODE TAOS_DEF_ERROR_CODE(0, 0x0414) // internal
#define TSDB_CODE_MNODE_NO_NEED_RESTORE TAOS_DEF_ERROR_CODE(0, 0x0415) // internal
#define TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0416)
#define TSDB_CODE_DNODE_NO_MACHINE_CODE TAOS_DEF_ERROR_CODE(0, 0x0417)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -48,7 +48,6 @@ static const SSysDbTableSchema mnodesSchema[] = {
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
// {.name = "machine_id", .bytes = TSDB_MACHINE_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
static const SSysDbTableSchema modulesSchema[] = {
@ -76,10 +75,6 @@ static const SSysDbTableSchema clusterSchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
#ifdef TD_ENTERPRISE
{.name = "granted_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "active_code", .bytes = TSDB_UNIQ_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#endif
};
static const SSysDbTableSchema userDBSchema[] = {

View File

@ -28,8 +28,6 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
int64_t mndGetClusterUpTime(SMnode *pMnode);
int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo);
int32_t mndGetClusterActive(SMnode *pMnode, char* active);
#ifdef __cplusplus
}

View File

@ -197,7 +197,6 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
int32_t upTime;
int64_t grantedTime;
char active[TSDB_UNIQ_ACTIVE_KEY_LEN];
} SClusterObj;
@ -770,7 +769,7 @@ typedef struct {
// SGrantObj
typedef enum {
GRANT_STATE_UNKNOWN = 0,
GRANT_STATE_INIT = 0,
GRANT_STATE_UNGRANTED = 1,
GRANT_STATE_GRANTED = 2,
GRANT_STATE_EXPIRED = 3,
@ -778,7 +777,7 @@ typedef enum {
} EGrantState;
typedef enum {
GRANT_STATE_REASON_UNKNOWN = 0,
GRANT_STATE_REASON_INIT = 0,
GRANT_STATE_REASON_ALTER = 1, // alter activeCode 'revoked' or 'xxx'
GRANT_STATE_REASON_MISMATCH = 2, // dnode machine mismatch
GRANT_STATE_REASON_EXPIRE = 3, // expire

View File

@ -38,11 +38,13 @@
int32_t grantAlterActiveCode(const char *active, char **newActive);
int32_t mndProcessConfigGrantReq(SRpcMsg * pReq, SMCfgClusterReq * pCfg);
int32_t mndProcessUpdMachineReq(SRpcMsg * pReq, SArray *pMachines);
int32_t mndProcessUpdStateReq(SRpcMsg * pReq, SGrantState *pState);
int32_t mndProcessConfigGrantReq(SMnode * pMnode, SRpcMsg * pReq, SMCfgClusterReq * pCfg);
int32_t mndProcessUpdMachineReq(SMnode * pMnode, SRpcMsg * pReq, SArray * pMachines);
int32_t mndProcessUpdStateReq(SMnode * pMnode, SRpcMsg * pReq, SGrantState * pState);
int32_t mndGrantGetLastState(SMnode * pMnode, SGrantState * pState);
int32_t mndGrantGetLastState(SMnode * pMnode, SGrantState * pState);
SGrantObj *mndAcquireGrant(SMnode * pMnode, void **ppIter);
void mndReleaseGrant(SMnode * pMnode, SGrantObj * pGrant, void *pIter);
#endif
#ifdef __cplusplus

View File

@ -21,7 +21,7 @@
#include "mndShow.h"
#include "mndTrans.h"
#define CLUSTER_VER_NUMBE 2
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 60
int64_t tsExpireTime = 0;
@ -107,30 +107,6 @@ int64_t mndGetClusterId(SMnode *pMnode) {
return clusterId;
}
int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
pInfo->grantedTime = pCluster->grantedTime;
mndReleaseCluster(pMnode, pCluster, pIter);
return 0;
}
return -1;
}
int32_t mndGetClusterActive(SMnode *pMnode, char *active) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster) {
if (active) strncpy(active, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN);
mndReleaseCluster(pMnode, pCluster, pIter);
return 0;
}
return -1;
}
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
int64_t createTime = 0;
void *pIter = NULL;
@ -177,8 +153,6 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->grantedTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
@ -203,7 +177,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver < 0 || sver > CLUSTER_VER_NUMBE) {
if (sver != CLUSTER_VER_NUMBE) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
@ -220,11 +194,6 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
if (sver > 1) {
SDB_GET_INT64(pRaw, dataPos, &pCluster->grantedTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->active, TSDB_UNIQ_ACTIVE_KEY_LEN, _OVER)
}
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
@ -348,16 +317,6 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *
colDataSetVal(pColInfo, numOfRows, (const char *)&tsExpireTime, false);
}
#ifdef TD_ENTERPRISE
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->grantedTime, false);
char active[TSDB_UNIQ_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(active, pCluster->active, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&active, false);
#endif
sdbRelease(pSdb, pCluster);
numOfRows++;
}
@ -437,7 +396,7 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
if (strncmp(cfgReq.config, GRANT_ACTIVE_CODE, TSDB_DNODE_CONFIG_LEN) == 0) {
#ifdef TD_ENTERPRISE
if (0 != (code = mndProcessConfigGrantReq(pReq, &cfgReq))) {
if (0 != (code = mndProcessConfigGrantReq(pMnode, pReq, &cfgReq))) {
goto _exit;
}
#else

View File

@ -149,6 +149,9 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (machineId) {
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
taosMemoryFreeClear(machineId);
} else {
terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
goto _OVER;
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");

View File

@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ALREADY_IS_VOTER, "Mnode already is a le
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ONLY_TWO_MNODE, "Only two mnodes exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NO_NEED_RESTORE, "No need to restore on this dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this command when the dnode is offline")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NO_MACHINE_CODE, "Dnode can not get machine code")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")