feat: support uniq grant

This commit is contained in:
kailixu 2024-02-02 08:41:45 +08:00
parent 231cf000da
commit 44d264456d
5 changed files with 170 additions and 22 deletions

View File

@ -383,6 +383,7 @@ int32_t catalogRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId
int32_t catalogUpdateDynViewVer(SCatalog* pCtg, SDynViewVersion* pVer);
int32_t catalogUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg);
int32_t catalogUpdateGrantInfo(SCatalog* pCtg, SGrantHbRsp* pMsg);
int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg);

View File

@ -327,37 +327,26 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal
return TSDB_CODE_SUCCESS;
}
#if 0
static int32_t hbProcessGrantInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
SGrantHbRsp hbRsp = {0};
if (tDeserializeSGrantHbRsp(value, valueLen, &hbRsp) != 0) {
taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp);
SGrantHbRsp *hbRsp = taosMemoryCalloc(1, sizeof(SGrantHbRsp));
if (!hbRsp) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tDeserializeSGrantHbRsp(value, valueLen, hbRsp) != 0) {
taosMemoryFree(hbRsp);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
tscInfo("hb to update grant info:%u", hbRsp->flags);
catalogUpdateGrantInfo(pCatalog, hbRsp);
if (rsp->numOfCols < 0) {
tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
tFreeSViewMetaRsp(rsp);
taosMemoryFreeClear(rsp);
} else {
tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
catalogUpdateViewMeta(pCatalog, rsp);
}
}
taosArrayDestroy(hbRsp.pViewRsp);
return TSDB_CODE_SUCCESS;
}
#endif
static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
for (int32_t i = 0; i < kvNum; ++i) {
@ -415,7 +404,7 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *p
break;
}
// hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog);
hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
#endif

View File

@ -69,6 +69,7 @@ typedef enum {
CTG_CI_UDF,
CTG_CI_SVR_VER,
CTG_CI_VIEW,
CTG_CI_GRANT_INFO,
CTG_CI_MAX_VALUE,
} CTG_CACHE_ITEM;
@ -101,6 +102,8 @@ enum {
CTG_OP_DROP_TB_INDEX,
CTG_OP_UPDATE_VIEW_META,
CTG_OP_DROP_VIEW_META,
CTG_OP_UPDATE_GRANT_INFO,
CTG_OP_DROP_GRANT_INFO,
CTG_OP_CLEAR_CACHE,
CTG_OP_MAX
};
@ -123,6 +126,7 @@ typedef enum {
CTG_TASK_GET_TB_HASH_BATCH,
CTG_TASK_GET_TB_TAG,
CTG_TASK_GET_VIEW,
CTG_TASK_GET_GRANT_INFO,
} CTG_TASK_TYPE;
typedef enum {
@ -322,9 +326,12 @@ typedef struct SCatalog {
SDynViewVersion dynViewVer;
SHashObj* userCache; // key:user, value:SCtgUserAuth
SHashObj* dbCache; // key:dbname, value:SCtgDBCache
SHashObj* grantCache;
SGrantHbRsp _grantCache;
SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent;
SCtgRentMgmt viewRent;
SCtgRentMgmt grantRent;
SCtgCacheStat cacheStat;
} SCatalog;
@ -548,6 +555,10 @@ typedef struct SCtgDropViewMetaMsg {
uint64_t viewId;
} SCtgDropViewMetaMsg;
typedef struct SCtgUpdateGrantInfoMsg {
SCatalog* pCtg;
SGrantHbRsp* pRsp;
} SCtgUpdateGrantInfoMsg;
typedef struct SCtgCacheOperation {
int32_t opId;
@ -948,6 +959,7 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName,
SCtgViewCache *pCache);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq);
int32_t ctgUpdateGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask);
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache* dbCache);

View File

@ -1747,6 +1747,21 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogUpdateGrantInfo(SCatalog* pCtg, SGrantHbRsp* pMsg) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pMsg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateGrantInfoToCache(pCtg, pMsg, true));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) {
CTG_API_ENTER();

View File

@ -32,6 +32,8 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
{CTG_OP_UPDATE_VIEW_META, "update viewMeta", ctgOpUpdateViewMeta},
{CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropViewMeta},
{CTG_OP_UPDATE_GRANT_INFO, "update grantInfo", ctgOpUpdateGrantInfo},
{CTG_OP_DROP_GRANT_INFO, "drop grantInfo", ctgOpDropGrantInfo},
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
@ -1323,6 +1325,33 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateGrantInfoEnqueue(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_GRANT_INFO;
op->syncOp = syncOp;
SCtgUpdateGrantInfoMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateGrantInfoMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateGrantInfoMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->pRsp = pRsp;
op->data = msg;
CTG_ERR_RET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(pRsp);
CTG_RET(code);
}
int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *viewName, uint64_t viewId, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
@ -1715,6 +1744,10 @@ int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncRe
CTG_RET(ctgUpdateViewMetaEnqueue(pCtg, pRsp, syncReq));
}
int32_t ctgUpdateGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncReq) {
CTG_RET(ctgUpdateGrantInfoEnqueue(pCtg, pRsp, syncReq));
}
void ctgClearAllHandles(void) {
SCatalog *pCtg = NULL;
@ -2450,6 +2483,104 @@ _return:
CTG_RET(code);
}
int32_t ctgOpUpdateGrantInfo(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateGrantInfoMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg;
SGrantHbRsp *pRsp = msg->pRsp;
SCtgDBCache *dbCache = NULL;
SViewMeta *pMeta = NULL;
taosMemoryFreeClear(msg);
if (pCtg->stopUpdate) {
goto _return;
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
pMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_JRET(dupViewMetaFromRsp(pRsp, pMeta));
code = ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta);
pMeta = NULL;
_return:
tFreeSViewMetaRsp(pRsp);
taosMemoryFree(pRsp);
ctgFreeSViewMeta(pMeta);
taosMemoryFree(pMeta);
CTG_RET(code);
}
int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgDropViewMetaMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg;
int32_t tblType = 0;
if (pCtg->stopUpdate) {
goto _return;
}
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
goto _return;
}
if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, viewName:%s", msg->dbId, dbCache->dbId,
msg->dbFName, msg->viewName);
goto _return;
}
SCtgViewCache *pViewCache = taosHashGet(dbCache->viewCache, msg->viewName, strlen(msg->viewName));
if (NULL == pViewCache) {
ctgDebug("view %s already not in cache", msg->viewName);
goto _return;
}
int64_t viewId = pViewCache->pMeta->viewId;
if (0 != msg->viewId && viewId != msg->viewId) {
ctgDebug("viewId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", viewName:%s", msg->viewId, viewId, msg->viewName);
goto _return;
}
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pViewCache->pMeta));
ctgFreeViewCacheImpl(pViewCache, true);
if (taosHashRemove(dbCache->viewCache, msg->viewName, strlen(msg->viewName))) {
ctgError("view %s not exist in cache, dbFName:%s", msg->viewName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} else {
atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(SCtgViewCache) + strlen(msg->viewName));
CTG_DB_NUM_DEC(CTG_CI_VIEW);
}
ctgDebug("view %s removed from cache, dbFName:%s", msg->viewName, msg->dbFName);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->viewRent, viewId, ctgViewVersionSortCompare, ctgViewVersionSearchCompare));
ctgDebug("view %s removed from rent, dbFName:%s, viewId:0x%" PRIx64, msg->viewName, msg->dbFName, viewId);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
void ctgClearFreeCache(SCtgCacheOperation *operation) {
SCtgClearCacheMsg *msg = operation->data;