From adee720128159c1f3236f28959d81c4e432bfb27 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 1 Feb 2024 17:48:29 +0800 Subject: [PATCH] feat: support uniq grant --- include/common/tgrant.h | 3 +- include/common/tmsg.h | 8 +++++ include/libs/catalog/catalog.h | 4 +++ source/client/src/clientHb.c | 40 +++++++++++++++++++++++ source/common/src/tgrant.c | 1 + source/common/src/tmsg.c | 29 ++++++++++++++++ source/dnode/mnode/impl/inc/mndGrant.h | 6 ++-- source/dnode/mnode/impl/src/mndConsumer.c | 7 ++-- source/dnode/mnode/impl/src/mndProfile.c | 11 +++++++ source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 2 +- source/libs/parser/src/parInsertSql.c | 12 +++++-- 12 files changed, 114 insertions(+), 11 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index 688fac858e..9f7572ed63 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -54,7 +54,8 @@ typedef enum { TSDB_GRANT_BACKUP_RESTORE, } EGrantType; -int32_t grantCheck(EGrantType grant); +int32_t grantCheck(EGrantType grant); // less +int32_t grantCheckLE(EGrantType grant); // less or equal char* tGetMachineId(); #ifndef TD_UNIQ_GRANT int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 04502eb64a..9b493c4a40 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -110,6 +110,7 @@ enum { HEARTBEAT_KEY_TMQ, HEARTBEAT_KEY_DYN_VIEW, HEARTBEAT_KEY_VIEWINFO, + HEARTBEAT_KEY_GRANT, }; typedef enum _mgmt_table { @@ -1885,6 +1886,13 @@ int32_t tSerializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp); int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp); void tFreeSViewHbRsp(SViewHbRsp* pRsp); +typedef struct { + uint32_t flags; +} SGrantHbRsp; + +int32_t tSerializeSGrantHbRsp(void* buf, int32_t bufLen, SGrantHbRsp* pRsp); +int32_t tDeserializeSGrantHbRsp(void* buf, int32_t bufLen, SGrantHbRsp* pRsp); + typedef struct { int32_t numOfTables; int32_t numOfVgroup; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b34b998d76..3649f369b7 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -145,6 +145,10 @@ typedef struct SSTableVersion { int32_t smaVer; } SSTableVersion; +typedef struct SGrantVersion { + int32_t version; +} SGrantVersion; + typedef struct SDbCacheInfo { char dbFName[TSDB_DB_FNAME_LEN]; int64_t dbId; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 63a65d7c95..f4c2fbec5a 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -327,6 +327,37 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal return TSDB_CODE_SUCCESS; } +#if 0 +static int32_t hbProcessGrantInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t code = 0; + + SGrantHbRsp hbRsp = {0}; + if (tDeserializeSGrantHbRsp(value, valueLen, &hbRsp) != 0) { + taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp); + for (int32_t i = 0; i < numOfMeta; ++i) { + SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i); + + if (rsp->numOfCols < 0) { + tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name); + catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId); + tFreeSViewMetaRsp(rsp); + taosMemoryFreeClear(rsp); + } else { + tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name); + catalogUpdateViewMeta(pCatalog, rsp); + } + } + + taosArrayDestroy(hbRsp.pViewRsp); + return TSDB_CODE_SUCCESS; +} +#endif + static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { for (int32_t i = 0; i < kvNum; ++i) { @@ -378,6 +409,15 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *p hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog); break; } + case HEARTBEAT_KEY_GRANT: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid grant info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + // hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog); + break; + } #endif default: tscError("invalid hb key type:%d", kv->key); diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c index f212d71362..8e4fe9febb 100644 --- a/source/common/src/tgrant.c +++ b/source/common/src/tgrant.c @@ -19,5 +19,6 @@ #ifndef _GRANT int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;} +int32_t grantCheckLE(EGrantType grant) {return TSDB_CODE_SUCCESS;} #endif \ No newline at end of file diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index de505ab3d2..c866aae209 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9303,3 +9303,32 @@ void tFreeSViewHbRsp(SViewHbRsp *pRsp) { taosArrayDestroy(pRsp->pViewRsp); } + +int32_t tSerializeSGrantHbRsp(void *buf, int32_t bufLen, SGrantHbRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeU32v(&encoder, pRsp->flags) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSGrantHbRsp(void *buf, int32_t bufLen, SGrantHbRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeU32v(&decoder, &pRsp->flags) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndGrant.h b/source/dnode/mnode/impl/inc/mndGrant.h index 82b3260860..a012d7a8f5 100644 --- a/source/dnode/mnode/impl/inc/mndGrant.h +++ b/source/dnode/mnode/impl/inc/mndGrant.h @@ -41,9 +41,9 @@ char **mergeActive); #endif - int32_t mndProcessConfigGrantReq(SMnode * pMnode, SRpcMsg * pReq, SMCfgClusterReq * pCfg); - int32_t mndProcessUpdGrantLog(SMnode * pMnode, SRpcMsg * pReq, SArray * pMachines, SGrantState * pState); - + int32_t mndProcessConfigGrantReq(SMnode * pMnode, SRpcMsg * pReq, SMCfgClusterReq * pCfg); + int32_t mndProcessUpdGrantLog(SMnode * pMnode, SRpcMsg * pReq, SArray * pMachines, SGrantState * pState); + int32_t mndValidateGrant(SMnode * pMnode, SGrantVersion * pGrantVersion, void **ppRsp, int32_t *pRspLen); int32_t mndGrantGetLastState(SMnode * pMnode, SGrantState * pState); SGrantLogObj *mndAcquireGrant(SMnode * pMnode, void **ppIter); void mndReleaseGrant(SMnode * pMnode, SGrantLogObj * pGrant, void *pIter); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3ce548a4f6..753076f1f3 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -107,7 +107,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * goto FAILED; } - if ((terrno = grantCheck(TSDB_GRANT_SUBSCRIPTION)) < 0) { + if ((terrno = grantCheckLE(TSDB_GRANT_SUBSCRIPTION)) < 0) { code = terrno; goto FAILED; } @@ -240,9 +240,10 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR } STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1); strcpy(data->topic, topic); - if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIPTION) < 0) { + if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || + grantCheckLE(TSDB_GRANT_SUBSCRIPTION) < 0) { data->noPrivilege = 1; - } else{ + } else { data->noPrivilege = 0; } mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index e0559b4c48..d7e6d9609c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -18,6 +18,7 @@ #include "audit.h" #include "mndDb.h" #include "mndDnode.h" +#include "mndGrant.h" #include "mndMnode.h" #include "mndPrivilege.h" #include "mndQnode.h" @@ -605,6 +606,16 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } break; } + case HEARTBEAT_KEY_GRANT: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateGrant(pMnode, kv->value, &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_GRANT, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } #endif default: mError("invalid kv key:%d", kv->key); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5f5fbdba13..530b3d5bcd 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1628,7 +1628,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){ terrno = TSDB_CODE_GRANT_EXPIRED; return -1; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 4426ab0672..005caea31b 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); - if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){ if(suspendAllStreams(pMnode, &pReq->info) < 0){ return -1; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 512dfdaef2..8e32eca28e 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2193,6 +2193,10 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading"); } + // if ((terrno = grantCheck(TSDB_GRANT_CSV)) < 0) { + // return buildInvalidOperationMsg(&pCxt->msg, terrstr()); + // } + NEXT_TOKEN(pStmt->pSql, *pToken); if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z); @@ -2754,11 +2758,15 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOp } static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SRowsDataContext rowsDataCxt; + // if ((code = grantCheck(TSDB_GRANT_CSV)) < 0) { + // return code; + // } + if (!pStmt->stbSyntax) { - STableDataCxt* pTableCxt = NULL; + STableDataCxt* pTableCxt = NULL; code = getTableDataCxt(pCxt, pStmt, &pTableCxt); rowsDataCxt.pTableDataCxt = pTableCxt; } else {