feat: support uniq grant
This commit is contained in:
parent
4d8ba0f60f
commit
adee720128
|
@ -54,7 +54,8 @@ typedef enum {
|
|||
TSDB_GRANT_BACKUP_RESTORE,
|
||||
} EGrantType;
|
||||
|
||||
int32_t grantCheck(EGrantType grant);
|
||||
int32_t grantCheck(EGrantType grant); // less
|
||||
int32_t grantCheckLE(EGrantType grant); // less or equal
|
||||
char* tGetMachineId();
|
||||
#ifndef TD_UNIQ_GRANT
|
||||
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
|
||||
|
|
|
@ -110,6 +110,7 @@ enum {
|
|||
HEARTBEAT_KEY_TMQ,
|
||||
HEARTBEAT_KEY_DYN_VIEW,
|
||||
HEARTBEAT_KEY_VIEWINFO,
|
||||
HEARTBEAT_KEY_GRANT,
|
||||
};
|
||||
|
||||
typedef enum _mgmt_table {
|
||||
|
@ -1885,6 +1886,13 @@ int32_t tSerializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
|
|||
int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
|
||||
void tFreeSViewHbRsp(SViewHbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
uint32_t flags;
|
||||
} SGrantHbRsp;
|
||||
|
||||
int32_t tSerializeSGrantHbRsp(void* buf, int32_t bufLen, SGrantHbRsp* pRsp);
|
||||
int32_t tDeserializeSGrantHbRsp(void* buf, int32_t bufLen, SGrantHbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfTables;
|
||||
int32_t numOfVgroup;
|
||||
|
|
|
@ -145,6 +145,10 @@ typedef struct SSTableVersion {
|
|||
int32_t smaVer;
|
||||
} SSTableVersion;
|
||||
|
||||
typedef struct SGrantVersion {
|
||||
int32_t version;
|
||||
} SGrantVersion;
|
||||
|
||||
typedef struct SDbCacheInfo {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
int64_t dbId;
|
||||
|
|
|
@ -327,6 +327,37 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t hbProcessGrantInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t code = 0;
|
||||
|
||||
SGrantHbRsp hbRsp = {0};
|
||||
if (tDeserializeSGrantHbRsp(value, valueLen, &hbRsp) != 0) {
|
||||
taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
|
||||
|
||||
if (rsp->numOfCols < 0) {
|
||||
tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
|
||||
catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
|
||||
tFreeSViewMetaRsp(rsp);
|
||||
taosMemoryFreeClear(rsp);
|
||||
} else {
|
||||
tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
|
||||
catalogUpdateViewMeta(pCatalog, rsp);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(hbRsp.pViewRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
|
||||
for (int32_t i = 0; i < kvNum; ++i) {
|
||||
|
@ -378,6 +409,15 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *p
|
|||
hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_GRANT: {
|
||||
if (kv->valueLen <= 0 || NULL == kv->value) {
|
||||
tscError("invalid grant info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
// hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
default:
|
||||
tscError("invalid hb key type:%d", kv->key);
|
||||
|
|
|
@ -19,5 +19,6 @@
|
|||
#ifndef _GRANT
|
||||
|
||||
int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
|
||||
int32_t grantCheckLE(EGrantType grant) {return TSDB_CODE_SUCCESS;}
|
||||
|
||||
#endif
|
|
@ -9303,3 +9303,32 @@ void tFreeSViewHbRsp(SViewHbRsp *pRsp) {
|
|||
|
||||
taosArrayDestroy(pRsp->pViewRsp);
|
||||
}
|
||||
|
||||
int32_t tSerializeSGrantHbRsp(void *buf, int32_t bufLen, SGrantHbRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
if (tEncodeU32v(&encoder, pRsp->flags) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSGrantHbRsp(void *buf, int32_t bufLen, SGrantHbRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeU32v(&decoder, &pRsp->flags) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
|
@ -43,7 +43,7 @@
|
|||
|
||||
int32_t mndProcessConfigGrantReq(SMnode * pMnode, SRpcMsg * pReq, SMCfgClusterReq * pCfg);
|
||||
int32_t mndProcessUpdGrantLog(SMnode * pMnode, SRpcMsg * pReq, SArray * pMachines, SGrantState * pState);
|
||||
|
||||
int32_t mndValidateGrant(SMnode * pMnode, SGrantVersion * pGrantVersion, void **ppRsp, int32_t *pRspLen);
|
||||
int32_t mndGrantGetLastState(SMnode * pMnode, SGrantState * pState);
|
||||
SGrantLogObj *mndAcquireGrant(SMnode * pMnode, void **ppIter);
|
||||
void mndReleaseGrant(SMnode * pMnode, SGrantLogObj * pGrant, void *pIter);
|
||||
|
|
|
@ -107,7 +107,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
|
|||
goto FAILED;
|
||||
}
|
||||
|
||||
if ((terrno = grantCheck(TSDB_GRANT_SUBSCRIPTION)) < 0) {
|
||||
if ((terrno = grantCheckLE(TSDB_GRANT_SUBSCRIPTION)) < 0) {
|
||||
code = terrno;
|
||||
goto FAILED;
|
||||
}
|
||||
|
@ -240,9 +240,10 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR
|
|||
}
|
||||
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
|
||||
strcpy(data->topic, topic);
|
||||
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIPTION) < 0) {
|
||||
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
|
||||
grantCheckLE(TSDB_GRANT_SUBSCRIPTION) < 0) {
|
||||
data->noPrivilege = 1;
|
||||
} else{
|
||||
} else {
|
||||
data->noPrivilege = 0;
|
||||
}
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "audit.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndGrant.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndQnode.h"
|
||||
|
@ -605,6 +606,16 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_GRANT: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateGrant(pMnode, kv->value, &rspMsg, &rspLen);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv1 = {.key = HEARTBEAT_KEY_GRANT, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
default:
|
||||
mError("invalid kv key:%d", kv->key);
|
||||
|
|
|
@ -1628,7 +1628,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||
if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){
|
||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||
|
||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
||||
if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){
|
||||
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -2193,6 +2193,10 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
|||
return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
|
||||
}
|
||||
|
||||
// if ((terrno = grantCheck(TSDB_GRANT_CSV)) < 0) {
|
||||
// return buildInvalidOperationMsg(&pCxt->msg, terrstr());
|
||||
// }
|
||||
|
||||
NEXT_TOKEN(pStmt->pSql, *pToken);
|
||||
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
|
||||
|
@ -2757,6 +2761,10 @@ static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRowsDataContext rowsDataCxt;
|
||||
|
||||
// if ((code = grantCheck(TSDB_GRANT_CSV)) < 0) {
|
||||
// return code;
|
||||
// }
|
||||
|
||||
if (!pStmt->stbSyntax) {
|
||||
STableDataCxt* pTableCxt = NULL;
|
||||
code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
|
||||
|
|
Loading…
Reference in New Issue