From 44d264456d951d1ce28770569bd5196cf44f59f3 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 2 Feb 2024 08:41:45 +0800 Subject: [PATCH] feat: support uniq grant --- include/libs/catalog/catalog.h | 1 + source/client/src/clientHb.c | 33 +++---- source/libs/catalog/inc/catalogInt.h | 12 +++ source/libs/catalog/src/catalog.c | 15 +++ source/libs/catalog/src/ctgCache.c | 131 +++++++++++++++++++++++++++ 5 files changed, 170 insertions(+), 22 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 3649f369b7..2e013413a3 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -383,6 +383,7 @@ int32_t catalogRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId int32_t catalogUpdateDynViewVer(SCatalog* pCtg, SDynViewVersion* pVer); int32_t catalogUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg); +int32_t catalogUpdateGrantInfo(SCatalog* pCtg, SGrantHbRsp* pMsg); int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index f4c2fbec5a..755ab0eb91 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -327,37 +327,26 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal return TSDB_CODE_SUCCESS; } -#if 0 static int32_t hbProcessGrantInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; - SGrantHbRsp hbRsp = {0}; - if (tDeserializeSGrantHbRsp(value, valueLen, &hbRsp) != 0) { - taosArrayDestroyEx(hbRsp.pViewRsp, hbFreeSViewMetaInRsp); + SGrantHbRsp *hbRsp = taosMemoryCalloc(1, sizeof(SGrantHbRsp)); + if (!hbRsp) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (tDeserializeSGrantHbRsp(value, valueLen, hbRsp) != 0) { + taosMemoryFree(hbRsp); terrno = TSDB_CODE_INVALID_MSG; return -1; } - int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp); - for (int32_t i = 0; i < numOfMeta; ++i) { - SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i); + tscInfo("hb to update grant info:%u", hbRsp->flags); + catalogUpdateGrantInfo(pCatalog, hbRsp); - if (rsp->numOfCols < 0) { - tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name); - catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId); - tFreeSViewMetaRsp(rsp); - taosMemoryFreeClear(rsp); - } else { - tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name); - catalogUpdateViewMeta(pCatalog, rsp); - } - } - - taosArrayDestroy(hbRsp.pViewRsp); return TSDB_CODE_SUCCESS; } -#endif - static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { for (int32_t i = 0; i < kvNum; ++i) { @@ -415,7 +404,7 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *p break; } - // hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog); + hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog); break; } #endif diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index f9f4ee7dfc..b9a0752178 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -69,6 +69,7 @@ typedef enum { CTG_CI_UDF, CTG_CI_SVR_VER, CTG_CI_VIEW, + CTG_CI_GRANT_INFO, CTG_CI_MAX_VALUE, } CTG_CACHE_ITEM; @@ -101,6 +102,8 @@ enum { CTG_OP_DROP_TB_INDEX, CTG_OP_UPDATE_VIEW_META, CTG_OP_DROP_VIEW_META, + CTG_OP_UPDATE_GRANT_INFO, + CTG_OP_DROP_GRANT_INFO, CTG_OP_CLEAR_CACHE, CTG_OP_MAX }; @@ -123,6 +126,7 @@ typedef enum { CTG_TASK_GET_TB_HASH_BATCH, CTG_TASK_GET_TB_TAG, CTG_TASK_GET_VIEW, + CTG_TASK_GET_GRANT_INFO, } CTG_TASK_TYPE; typedef enum { @@ -322,9 +326,12 @@ typedef struct SCatalog { SDynViewVersion dynViewVer; SHashObj* userCache; // key:user, value:SCtgUserAuth SHashObj* dbCache; // key:dbname, value:SCtgDBCache + SHashObj* grantCache; + SGrantHbRsp _grantCache; SCtgRentMgmt dbRent; SCtgRentMgmt stbRent; SCtgRentMgmt viewRent; + SCtgRentMgmt grantRent; SCtgCacheStat cacheStat; } SCatalog; @@ -548,6 +555,10 @@ typedef struct SCtgDropViewMetaMsg { uint64_t viewId; } SCtgDropViewMetaMsg; +typedef struct SCtgUpdateGrantInfoMsg { + SCatalog* pCtg; + SGrantHbRsp* pRsp; +} SCtgUpdateGrantInfoMsg; typedef struct SCtgCacheOperation { int32_t opId; @@ -948,6 +959,7 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, SCtgViewCache *pCache); int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq); int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq); +int32_t ctgUpdateGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncReq); int32_t ctgStartUpdateThread(); int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask); void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache* dbCache); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index c2d88e5ce3..f52787a61e 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1747,6 +1747,21 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogUpdateGrantInfo(SCatalog* pCtg, SGrantHbRsp* pMsg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pMsg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgUpdateGrantInfoToCache(pCtg, pMsg, true)); + +_return: + + CTG_API_LEAVE(code); +} + int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 1b693b4e07..05078fabb4 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -32,6 +32,8 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex}, {CTG_OP_UPDATE_VIEW_META, "update viewMeta", ctgOpUpdateViewMeta}, {CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropViewMeta}, + {CTG_OP_UPDATE_GRANT_INFO, "update grantInfo", ctgOpUpdateGrantInfo}, + {CTG_OP_DROP_GRANT_INFO, "drop grantInfo", ctgOpDropGrantInfo}, {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { @@ -1323,6 +1325,33 @@ _return: CTG_RET(code); } +int32_t ctgUpdateGrantInfoEnqueue(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_UPDATE_GRANT_INFO; + op->syncOp = syncOp; + + SCtgUpdateGrantInfoMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateGrantInfoMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateGrantInfoMsg)); + taosMemoryFree(op); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + msg->pCtg = pCtg; + msg->pRsp = pRsp; + + op->data = msg; + + CTG_ERR_RET(ctgEnqueue(pCtg, op)); + + return TSDB_CODE_SUCCESS; +_return: + + taosMemoryFree(pRsp); + CTG_RET(code); +} + int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *viewName, uint64_t viewId, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); @@ -1715,6 +1744,10 @@ int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncRe CTG_RET(ctgUpdateViewMetaEnqueue(pCtg, pRsp, syncReq)); } +int32_t ctgUpdateGrantInfoToCache(SCatalog *pCtg, SGrantHbRsp *pRsp, bool syncReq) { + CTG_RET(ctgUpdateGrantInfoEnqueue(pCtg, pRsp, syncReq)); +} + void ctgClearAllHandles(void) { SCatalog *pCtg = NULL; @@ -2450,6 +2483,104 @@ _return: CTG_RET(code); } +int32_t ctgOpUpdateGrantInfo(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgUpdateGrantInfoMsg *msg = operation->data; + SCatalog *pCtg = msg->pCtg; + SGrantHbRsp *pRsp = msg->pRsp; + SCtgDBCache *dbCache = NULL; + SViewMeta *pMeta = NULL; + + taosMemoryFreeClear(msg); + + if (pCtg->stopUpdate) { + goto _return; + } + + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache)); + if (NULL == dbCache) { + ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + pMeta = taosMemoryCalloc(1, sizeof(SViewMeta)); + if (NULL == pMeta) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + CTG_ERR_JRET(dupViewMetaFromRsp(pRsp, pMeta)); + + code = ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta); + pMeta = NULL; + +_return: + + tFreeSViewMetaRsp(pRsp); + taosMemoryFree(pRsp); + ctgFreeSViewMeta(pMeta); + taosMemoryFree(pMeta); + + CTG_RET(code); +} + +int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgDropViewMetaMsg *msg = operation->data; + SCatalog *pCtg = msg->pCtg; + int32_t tblType = 0; + + if (pCtg->stopUpdate) { + goto _return; + } + + SCtgDBCache *dbCache = NULL; + ctgGetDBCache(pCtg, msg->dbFName, &dbCache); + if (NULL == dbCache) { + goto _return; + } + + if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) { + ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, viewName:%s", msg->dbId, dbCache->dbId, + msg->dbFName, msg->viewName); + goto _return; + } + + SCtgViewCache *pViewCache = taosHashGet(dbCache->viewCache, msg->viewName, strlen(msg->viewName)); + if (NULL == pViewCache) { + ctgDebug("view %s already not in cache", msg->viewName); + goto _return; + } + + int64_t viewId = pViewCache->pMeta->viewId; + if (0 != msg->viewId && viewId != msg->viewId) { + ctgDebug("viewId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", viewName:%s", msg->viewId, viewId, msg->viewName); + goto _return; + } + + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pViewCache->pMeta)); + ctgFreeViewCacheImpl(pViewCache, true); + + if (taosHashRemove(dbCache->viewCache, msg->viewName, strlen(msg->viewName))) { + ctgError("view %s not exist in cache, dbFName:%s", msg->viewName, msg->dbFName); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } else { + atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(SCtgViewCache) + strlen(msg->viewName)); + CTG_DB_NUM_DEC(CTG_CI_VIEW); + } + + ctgDebug("view %s removed from cache, dbFName:%s", msg->viewName, msg->dbFName); + + CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->viewRent, viewId, ctgViewVersionSortCompare, ctgViewVersionSearchCompare)); + + ctgDebug("view %s removed from rent, dbFName:%s, viewId:0x%" PRIx64, msg->viewName, msg->dbFName, viewId); + +_return: + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + void ctgClearFreeCache(SCtgCacheOperation *operation) { SCtgClearCacheMsg *msg = operation->data;