Merge pull request #14979 from taosdata/feature/grant-3.0
feat: support grant
This commit is contained in:
commit
5ef9239c74
|
@ -268,6 +268,26 @@ typedef struct SSortExecInfo {
|
|||
int32_t readBytes; // read io bytes
|
||||
} SSortExecInfo;
|
||||
|
||||
//======================================================================================================================
|
||||
// for grant
|
||||
typedef enum {
|
||||
TSDB_GRANT_ALL,
|
||||
TSDB_GRANT_TIME,
|
||||
TSDB_GRANT_USER,
|
||||
TSDB_GRANT_DB,
|
||||
TSDB_GRANT_TIMESERIES,
|
||||
TSDB_GRANT_DNODE,
|
||||
TSDB_GRANT_ACCT,
|
||||
TSDB_GRANT_STORAGE,
|
||||
TSDB_GRANT_SPEED,
|
||||
TSDB_GRANT_QUERY_TIME,
|
||||
TSDB_GRANT_CONNS,
|
||||
TSDB_GRANT_STREAMS,
|
||||
TSDB_GRANT_CPU_CORES,
|
||||
} EGrantType;
|
||||
|
||||
int32_t grantCheck(EGrantType grant);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -139,6 +139,7 @@ extern int32_t tsTransPullupInterval;
|
|||
extern int32_t tsMqRebalanceInterval;
|
||||
extern int32_t tsTtlUnit;
|
||||
extern int32_t tsTtlPushInterval;
|
||||
extern int32_t tsGrantHBInterval;
|
||||
|
||||
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
|
||||
|
|
|
@ -152,6 +152,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_HB_TIMER, "grant-hb-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
|
||||
|
|
|
@ -184,6 +184,7 @@ int32_t tsTransPullupInterval = 2;
|
|||
int32_t tsMqRebalanceInterval = 2;
|
||||
int32_t tsTtlUnit = 86400;
|
||||
int32_t tsTtlPushInterval = 60;
|
||||
int32_t tsGrantHBInterval = 60;
|
||||
|
||||
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
|
||||
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
|
||||
|
|
|
@ -46,6 +46,7 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t dmProcessGrantReq(SRpcMsg *pMsg);
|
||||
|
||||
// dmWorker.c
|
||||
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -331,7 +331,8 @@ SArray *dmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
// Requests handled by MNODE
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
|
|
@ -144,6 +144,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
case TDMT_DND_SYSTABLE_RETRIEVE:
|
||||
code = dmProcessRetrieve(pMgmt, pMsg);
|
||||
break;
|
||||
case TDMT_MND_GRANT:
|
||||
code = dmProcessGrantReq(pMsg);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
break;
|
||||
|
|
|
@ -206,7 +206,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -153,9 +153,15 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
|
||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||
code = terrno;
|
||||
dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr());
|
||||
} else {
|
||||
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
}
|
||||
break;
|
||||
case STREAM_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
||||
|
@ -166,8 +172,14 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
break;
|
||||
case WRITE_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||
if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
|
||||
terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||
code = terrno;
|
||||
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
|
||||
} else {
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||
}
|
||||
break;
|
||||
case SYNC_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
|
|
|
@ -26,6 +26,7 @@ int32_t mndInitCluster(SMnode *pMnode);
|
|||
void mndCleanupCluster(SMnode *pMnode);
|
||||
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
|
||||
int64_t mndGetClusterId(SMnode *pMnode);
|
||||
int64_t mndGetClusterCreateTime(SMnode *pMnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
|
|||
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
|
||||
int32_t mndGetDnodeSize(SMnode *pMnode);
|
||||
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
|
||||
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -22,27 +22,10 @@
|
|||
|
||||
#include "mndInt.h"
|
||||
|
||||
typedef enum {
|
||||
TSDB_GRANT_ALL,
|
||||
TSDB_GRANT_TIME,
|
||||
TSDB_GRANT_USER,
|
||||
TSDB_GRANT_DB,
|
||||
TSDB_GRANT_TIMESERIES,
|
||||
TSDB_GRANT_DNODE,
|
||||
TSDB_GRANT_ACCT,
|
||||
TSDB_GRANT_STORAGE,
|
||||
TSDB_GRANT_SPEED,
|
||||
TSDB_GRANT_QUERY_TIME,
|
||||
TSDB_GRANT_CONNS,
|
||||
TSDB_GRANT_STREAMS,
|
||||
TSDB_GRANT_CPU_CORES,
|
||||
} EGrantType;
|
||||
|
||||
int32_t mndInitGrant();
|
||||
int32_t mndInitGrant(SMnode *pMnode);
|
||||
void mndCleanupGrant();
|
||||
void grantParseParameter();
|
||||
int32_t grantCheck(EGrantType grant);
|
||||
void grantReset(EGrantType grant, uint64_t value);
|
||||
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value);
|
||||
void grantAdd(EGrantType grant, uint64_t value);
|
||||
void grantRestore(EGrantType grant, uint64_t value);
|
||||
|
||||
|
|
|
@ -79,6 +79,23 @@ int64_t mndGetClusterId(SMnode *pMnode) {
|
|||
return clusterId;
|
||||
}
|
||||
|
||||
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
int64_t createTime = INT64_MAX;
|
||||
|
||||
while (1) {
|
||||
SClusterObj *pCluster = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
createTime = pCluster->createdTime;
|
||||
sdbRelease(pSdb, pCluster);
|
||||
}
|
||||
|
||||
return createTime;
|
||||
}
|
||||
|
||||
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
|
|
@ -509,6 +509,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
|||
SUserObj *pUser = NULL;
|
||||
SCreateDbReq createReq = {0};
|
||||
|
||||
code = grantCheck(TSDB_GRANT_DB);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (tDeserializeSCreateDbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
|
|
|
@ -262,7 +262,7 @@ bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
|
||||
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
int32_t numOfEps = 0;
|
||||
|
@ -621,6 +621,12 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
|||
SDnodeObj *pDnode = NULL;
|
||||
SCreateDnodeReq createReq = {0};
|
||||
|
||||
code = grantCheck(TSDB_GRANT_DNODE);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
|
|
|
@ -118,17 +118,21 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
static int32_t mndProcessGrantHB(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; }
|
||||
|
||||
int32_t mndInitGrant(SMnode *pMnode) {
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_GRANT_HB_TIMER, mndProcessGrantHB);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mndCleanupGrant() {}
|
||||
void grantParseParameter() { mError("can't parsed parameter k"); }
|
||||
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
|
||||
void grantReset(EGrantType grant, uint64_t value) {}
|
||||
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
|
||||
void grantAdd(EGrantType grant, uint64_t value) {}
|
||||
void grantRestore(EGrantType grant, uint64_t value) {}
|
||||
int32_t dmProcessGrantReq(SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||
|
||||
#endif
|
||||
|
||||
|
|
|
@ -90,6 +90,16 @@ static void mndPullupTelem(SMnode *pMnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void mndGrantHeartBeat(SMnode *pMnode) {
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildTimerMsg(&contLen);
|
||||
if (pReq != NULL) {
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
|
||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void *mndThreadFp(void *param) {
|
||||
SMnode *pMnode = param;
|
||||
int64_t lastTime = 0;
|
||||
|
@ -115,6 +125,10 @@ static void *mndThreadFp(void *param) {
|
|||
if (lastTime % (tsTelemInterval * 10) == 0) {
|
||||
mndPullupTelem(pMnode);
|
||||
}
|
||||
|
||||
if (lastTime % (tsGrantHBInterval * 10) == 0) {
|
||||
mndGrantHeartBeat(pMnode);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -402,6 +416,9 @@ int32_t mndStart(SMnode *pMnode) {
|
|||
}
|
||||
mndSetRestore(pMnode, true);
|
||||
}
|
||||
|
||||
grantReset(pMnode, TSDB_GRANT_ALL, 0);
|
||||
|
||||
return mndInitTimer(pMnode);
|
||||
}
|
||||
|
||||
|
|
|
@ -363,6 +363,12 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = grantCheck(TSDB_GRANT_USER);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
|
|
|
@ -99,7 +99,8 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
|||
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
|
||||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
|
||||
int metaGetTbNum(SMeta* pMeta);
|
||||
int64_t metaGetTbNum(SMeta* pMeta);
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
|
||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
||||
|
|
|
@ -463,10 +463,16 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int metaGetTbNum(SMeta *pMeta) {
|
||||
// N.B. Called by statusReq per second
|
||||
int64_t metaGetTbNum(SMeta *pMeta) {
|
||||
// TODO
|
||||
// ASSERT(0);
|
||||
return 0;
|
||||
return 100;
|
||||
}
|
||||
|
||||
// N.B. Called by statusReq per second
|
||||
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
||||
// TODO
|
||||
return 400;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -239,9 +239,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
|||
pLoad->vgId = TD_VID(pVnode);
|
||||
pLoad->syncState = syncGetMyRole(pVnode->sync);
|
||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||
pLoad->numOfTimeSeries = 400;
|
||||
pLoad->totalStorage = 300;
|
||||
pLoad->compStorage = 200;
|
||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||
pLoad->compStorage = (int64_t)2 * 1073741824;
|
||||
pLoad->pointsWritten = 100;
|
||||
pLoad->numOfSelectReqs = 1;
|
||||
pLoad->numOfInsertReqs = 3;
|
||||
|
|
|
@ -272,7 +272,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
|
||||
taosMemoryFree(pCtg);
|
||||
|
||||
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
|
||||
ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
|
||||
}
|
||||
|
||||
void ctgClearHandle(SCatalog* pCtg) {
|
||||
|
@ -303,7 +303,7 @@ void ctgClearHandle(SCatalog* pCtg) {
|
|||
|
||||
CTG_CACHE_STAT_INC(numOfClear, 1);
|
||||
|
||||
ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId);
|
||||
ctgInfo("handle cleared, clusterId:0x%" PRIx64, clusterId);
|
||||
}
|
||||
|
||||
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
|
||||
|
|
Loading…
Reference in New Issue