feat: support uniq grant

This commit is contained in:
kailixu 2024-02-04 00:24:42 +08:00
parent 3a08926319
commit f53d0bce5d
13 changed files with 84 additions and 28 deletions

View File

@ -30,6 +30,10 @@ extern "C" {
#define GRANT_HEART_BEAT_MIN 2
#define GRANT_ACTIVE_CODE "activeCode"
#define GRANT_ALL_FLAG (0x01)
#define GRANT_AUDIT_FLAG (0x02)
#define GRANT_CSV_FLAG (0x04)
#define GRANT_VIEW_FLAG (0x08)
typedef enum {
TSDB_GRANT_ALL,
@ -50,6 +54,7 @@ typedef enum {
TSDB_GRANT_SUBSCRIPTION,
TSDB_GRANT_AUDIT,
TSDB_GRANT_CSV,
TSDB_GRANT_VIEW,
TSDB_GRANT_MULTI_TIER,
TSDB_GRANT_BACKUP_RESTORE,
} EGrantType;

View File

@ -3931,6 +3931,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
#define SUBMIT_REQ_FROM_FILE 0x4
typedef struct {
int32_t flags;

View File

@ -25,6 +25,7 @@ extern "C" {
#include "taosdef.h"
#include "tarray.h"
#include "tcommon.h"
#include "tgrant.h"
#include "thash.h"
#include "tmsg.h"
#include "tname.h"
@ -364,7 +365,9 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pAuth, SUserAuthRes* pRes);
int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists);
int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo* pAuth, SUserAuthRes* pRes, bool* exists);
int32_t catalogChkGrant(SCatalog* pCtg, EGrantType grant);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);

View File

@ -575,8 +575,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
#define TSDB_CODE_GRANT_CSV_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0824)
#define TSDB_CODE_GRANT_AUDIT_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0825)
// sync
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x

View File

@ -238,6 +238,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
TSDB_CHECK_CODE(code, lino, _exit);
}
if (submitTbData.flags & SUBMIT_REQ_FROM_FILE) {
code = grantCheck(TSDB_GRANT_CSV);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t uid;
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);

View File

@ -324,7 +324,6 @@ typedef struct SCtgUserAuth {
typedef struct SCtgGrantCache {
SRWLatch lock;
SGrantHbRsp grantInfo;
uint64_t grantCacheSize;
} SCtgGrantCache;
typedef struct SCatalog {

View File

@ -353,6 +353,39 @@ _return:
CTG_RET(code);
}
int32_t ctgChkGrant(SCatalog* pCtg, EGrantType grant) {
int32_t code = 0;
int32_t flag = 0;
SCtgGrantCache* pCache = &pCtg->grantCache;
CTG_LOCK(CTG_READ, &pCache->lock);
switch (grant) {
case TSDB_GRANT_ALL: {
flag = pCache->grantInfo.flags & GRANT_ALL_FLAG;
break;
}
case TSDB_GRANT_AUDIT: {
flag = pCache->grantInfo.flags & GRANT_AUDIT_FLAG;
break;
}
case TSDB_GRANT_CSV: {
flag = pCache->grantInfo.flags & GRANT_CSV_FLAG;
break;
}
case TSDB_GRANT_VIEW: {
flag = pCache->grantInfo.flags & GRANT_VIEW_FLAG;
break;
}
}
CTG_UNLOCK(CTG_READ, &pCache->lock);
if (flag) code = TSDB_CODE_GRANT_EXPIRED;
_return:
CTG_RET(code);
}
int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, int32_t* tbType) {
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
@ -1693,6 +1726,20 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogChkGrant(SCatalog* pCtg, EGrantType grant) {
CTG_API_ENTER();
if (NULL == pCtg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgChkGrant(pCtg, grant));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo* pConn, char** pVersion) {
CTG_API_ENTER();

View File

@ -2520,8 +2520,6 @@ int32_t ctgOpUpdateGrantInfo(SCtgCacheOperation *operation) {
goto _return;
}
// CTG_ERR_JRET(ctgAcquireGrantCache(pCtg, &pGrantCache)); // TODO: inc/dec or cacheSize ?
code = ctgWriteGrantInfoToCache(pCtg, pRsp);
_return:

View File

@ -48,7 +48,8 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues);
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals,
uint32_t insertType);
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild);
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);

View File

@ -169,7 +169,8 @@ STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta) {
STableDataCxt* pTableCxt = NULL;
SVCreateTbReq* pCreateTbReq = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, &pTableCxt, false, false);
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, &pTableCxt, false, false,
((SVnodeModifyOpStmt*)(query->pRoot))->insertType);
if (ret != TSDB_CODE_SUCCESS) {
return NULL;
}
@ -313,7 +314,8 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
STableDataCxt* pTableCxt = NULL;
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false);
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false,
((SVnodeModifyOpStmt*)(query->pRoot))->insertType);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
goto end;

View File

@ -1333,7 +1333,7 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt** pTableCxt) {
if (pCxt->pComCxt->async) {
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid),
pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false);
pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false, pStmt->insertType);
}
char tbFName[TSDB_TABLE_FNAME_LEN];
@ -1342,7 +1342,7 @@ static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
pStmt->pTableMeta->uid = 0;
}
return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
&pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false);
&pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false, pStmt->insertType);
}
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
@ -1931,7 +1931,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
}
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true, pStmt->insertType);
initTableColSubmitData(*ppTableDataCxt);
if (code == TSDB_CODE_SUCCESS) {
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
@ -2139,14 +2139,20 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
}
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
int32_t code = 0;
int32_t numOfRows = 0;
if ((code = catalogChkGrant(pCxt->pComCxt->pCatalog, TSDB_GRANT_CSV)) < 0) {
return code;
}
// init only for file
if (NULL == pStmt->pTableCxtHashObj) {
pStmt->pTableCxtHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
int32_t numOfRows = 0;
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
if (TSDB_CODE_SUCCESS == code) {
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
@ -2193,10 +2199,6 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
}
// if ((terrno = grantCheck(TSDB_GRANT_CSV)) < 0) {
// return buildInvalidOperationMsg(&pCxt->msg, terrstr());
// }
NEXT_TOKEN(pStmt->pSql, *pToken);
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
@ -2761,10 +2763,6 @@ static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpSt
int32_t code = TSDB_CODE_SUCCESS;
SRowsDataContext rowsDataCxt;
// if ((code = grantCheck(TSDB_GRANT_CSV)) < 0) {
// return code;
// }
if (!pStmt->stbSyntax) {
STableDataCxt* pTableCxt = NULL;
code = getTableDataCxt(pCxt, pStmt, &pTableCxt);

View File

@ -208,7 +208,7 @@ void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) {
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
bool colMode, bool ignoreColVals) {
bool colMode, bool ignoreColVals, uint32_t insertType) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
if (NULL == pTableCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -249,6 +249,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
} else {
pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
pTableCxt->pData->flags |= insertType == TSDB_QUERY_TYPE_FILE_INSERT ? SUBMIT_REQ_FROM_FILE : 0;
pTableCxt->pData->suid = pTableMeta->suid;
pTableCxt->pData->uid = pTableMeta->uid;
pTableCxt->pData->sver = pTableMeta->sversion;
@ -330,7 +331,7 @@ static void resetColValues(SArray* pValues) {
}
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals, uint32_t insertType) {
STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
if (NULL != tmp) {
*pTableCxt = *tmp;
@ -339,7 +340,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
}
return TSDB_CODE_SUCCESS;
}
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals, insertType);
if (TSDB_CODE_SUCCESS == code) {
void* pData = *pTableCxt; // deal scan coverity
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
@ -645,7 +646,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false);
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false, ((SVnodeModifyOpStmt*)(query->pRoot))->insertType);
if (ret != TSDB_CODE_SUCCESS) {
uError("insGetTableDataCxt error");
goto end;

View File

@ -462,8 +462,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mism
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_LIMITED, "Csv has reached the licensed upper limit")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_LIMITED, "Audit has reached the licensed upper limit")
// sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")