diff --git a/source/dnode/mnode/impl/inc/mndView.h b/source/dnode/mnode/impl/inc/mndView.h index bd21d85806..ebc28f8898 100755 --- a/source/dnode/mnode/impl/inc/mndView.h +++ b/source/dnode/mnode/impl/inc/mndView.h @@ -27,6 +27,8 @@ void mndCleanupView(SMnode *pMnode); int32_t mndProcessCreateViewReq(SRpcMsg *pReq); int32_t mndProcessDropViewReq(SRpcMsg *pReq); +int32_t mndProcessGetViewMetaReq(SRpcMsg *pReq); + #ifdef TD_ENTERPRISE SViewObj *mndAcquireView(SMnode *pMnode, char *viewName); @@ -40,6 +42,8 @@ int32_t mndViewActionUpdate(SSdb *pSdb, SViewObj *pOldView, SViewObj *pNewView); int32_t mndProcessCreateViewReqImpl(SCMCreateViewReq* pCreateView, SRpcMsg *pReq); int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq); +int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq); + #endif diff --git a/source/dnode/mnode/impl/src/mndView.c b/source/dnode/mnode/impl/src/mndView.c index 9955284917..a700ce2526 100755 --- a/source/dnode/mnode/impl/src/mndView.c +++ b/source/dnode/mnode/impl/src/mndView.c @@ -77,7 +77,7 @@ int32_t mndProcessDropViewReq(SRpcMsg *pReq) { #endif } -static int32_t mndProcessViewMetaReq(SRpcMsg *pReq) { +int32_t mndProcessGetViewMetaReq(SRpcMsg *pReq) { #ifndef TD_ENTERPRISE return TSDB_CODE_OPS_NOT_SUPPORT; #else @@ -85,10 +85,10 @@ static int32_t mndProcessViewMetaReq(SRpcMsg *pReq) { if (tDeserializeSViewMetaReq(pReq->pCont, pReq->contLen, &req) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; + return -1; } - return mndProcessTableMetaReqImpl(&req, pReq); + return mndProcessViewMetaReqImpl(&req, pReq); #endif } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 72d26cdd5a..c98683490a 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -921,8 +921,18 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncO int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp); int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size); int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size); +int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, + __compar_fn_t searchCompare); int32_t ctgMetaRentGet(SCtgRentMgmt* mgmt, void** res, uint32_t* num, int32_t size); +int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare); +void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache); +void ctgRemoveViewRent(SCatalog *pCtg, SCtgDBCache *dbCache); +int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid, + SCtgTbCache *pCache); +int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, uint64_t dbId, uint64_t viewId, + SCtgViewCache *pCache); int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq); +int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq); int32_t ctgStartUpdateThread(); int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask); void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache* dbCache); @@ -931,9 +941,11 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); int32_t ctgOpDropTbIndex(SCtgCacheOperation* operation); int32_t ctgOpUpdateTbIndex(SCtgCacheOperation* operation); int32_t ctgOpClearCache(SCtgCacheOperation* operation); +int32_t ctgOpUpdateViewMeta(SCtgCacheOperation *operation); int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char* tableName, int32_t* tbType); int32_t ctgGetTbHashVgroupFromCache(SCatalog* pCtg, const SName* pTableName, SVgroupInfo** pVgroup); - +int32_t ctgGetViewsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgViewsCtx *ctx, int32_t dbIdx, + int32_t *fetchIdx, int32_t baseResIdx, SArray *pList); int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target); int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out, SCtgTaskReq* tReq); @@ -959,6 +971,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out, SCtgTask* pTask); int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask); +int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pName, SViewMetaOutput* out, + SCtgTaskReq* tReq); int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs); int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, @@ -985,9 +999,11 @@ int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFNam void ctgResetTbMetaTask(SCtgTask* pTask); void ctgFreeDbCache(SCtgDBCache* dbCache); int32_t ctgStbVersionSortCompare(const void* key1, const void* key2); +int32_t ctgViewVersionSortCompare(const void* key1, const void* key2); int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2); int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2); int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2); +int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target); @@ -1019,8 +1035,10 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache); void ctgGetGlobalCacheStat(SCtgCacheStat* pStat); int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res); +int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx); void ctgGetGlobalCacheSize(uint64_t *pSize); uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex); +uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta); uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta); uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg); uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 9df8a51128..945b84e0fe 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -20,6 +20,30 @@ #include "tref.h" #include "trpc.h" + +void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) { + SCtgTask* pTask = NULL; + + *done = true; + + CTG_LOCK(CTG_READ, &pJob->taskLock); + + int32_t taskNum = taosArrayGetSize(pJob->pTasks); + for (int32_t i = 0; i < taskNum; ++i) { + pTask = taosArrayGet(pJob->pTasks, i); + if (type != pTask->type) { + continue; + } + + if (pTask->status != CTG_TASK_DONE) { + *done = false; + break; + } + } + + CTG_UNLOCK(CTG_READ, &pJob->taskLock); +} + int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgTbMetaParam* pParam = (SCtgTbMetaParam*)param; SName* name = pParam->pName; @@ -1029,6 +1053,20 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } + +int32_t ctgDumpViewsRes(SCtgTask* pTask) { + if (pTask->subTask) { + return TSDB_CODE_SUCCESS; + } + + SCtgJob* pJob = pTask->pJob; + + pJob->jobRes.pView = pTask->res; + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgCallSubCb(SCtgTask* pTask) { int32_t code = 0; @@ -1816,7 +1854,7 @@ int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); - SViewMetaRsp* pRsp = (SViewMetaRsp**)pMsgCtx->out; + SViewMetaRsp* pRsp = *(SViewMetaRsp**)pMsgCtx->out; SViewMeta* pViewMeta = taosMemoryCalloc(1, sizeof(SViewMeta)); if (NULL == pViewMeta) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -1827,6 +1865,8 @@ int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* taosMemoryFree(pViewMeta); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + pViewMeta->version = pRsp->version; + pViewMeta->viewId = pRsp->viewId; ctgUpdateViewMetaToCache(pCtg, pRsp, false); @@ -2682,29 +2722,6 @@ _return: CTG_RET(code); } -void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) { - SCtgTask* pTask = NULL; - - *done = true; - - CTG_LOCK(CTG_READ, &pJob->taskLock); - - int32_t taskNum = taosArrayGetSize(pJob->pTasks); - for (int32_t i = 0; i < taskNum; ++i) { - pTask = taosArrayGet(pJob->pTasks, i); - if (type != pTask->type) { - continue; - } - - if (pTask->status != CTG_TASK_DONE) { - *done = false; - break; - } - } - - CTG_UNLOCK(CTG_READ, &pJob->taskLock); -} - SCtgTask* ctgGetTask(SCtgJob* pJob, int32_t taskId) { int32_t taskNum = taosArrayGetSize(pJob->pTasks); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 4c0adf73eb..f6e801d59c 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1370,6 +1370,7 @@ int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *d atomic_store_8(&dbCache->deleted, 1); ctgRemoveStbRent(pCtg, dbCache); + ctgRemoveViewRent(pCtg, dbCache); ctgFreeDbCache(dbCache); CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock); @@ -1582,6 +1583,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa } int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *viewName, SViewMeta *pMeta) { + int32_t code = TSDB_CODE_SUCCESS; if (NULL == dbCache->viewCache) { ctgWarn("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED); @@ -1612,7 +1614,7 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN CTG_LOCK(CTG_WRITE, &pCache->viewLock); if (pCache->pMeta) { - atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pMeta)); + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pCache->pMeta)); taosMemoryFree(pCache->pMeta->querySql); taosMemoryFree(pCache->pMeta); } @@ -1620,7 +1622,7 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN pCache->pMeta = pMeta; CTG_UNLOCK(CTG_WRITE, &pCache->viewLock); - atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pMeta)); + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pMeta)); pMeta = NULL; diff --git a/source/libs/catalog/src/ctgRent.c b/source/libs/catalog/src/ctgRent.c index ba7e02e333..457285b147 100755 --- a/source/libs/catalog/src/ctgRent.c +++ b/source/libs/catalog/src/ctgRent.c @@ -241,6 +241,25 @@ void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) { } } +void ctgRemoveViewRent(SCatalog *pCtg, SCtgDBCache *dbCache) { + if (NULL == dbCache->stbCache) { + return; + } + + void *pIter = taosHashIterate(dbCache->viewCache, NULL); + while (pIter) { + uint64_t viewId = ((SCtgViewCache*)pIter)->pMeta->viewId; + + if (TSDB_CODE_SUCCESS == + ctgMetaRentRemove(&pCtg->viewRent, viewId, ctgViewVersionSortCompare, ctgViewVersionSearchCompare)) { + ctgDebug("view removed from rent, viewId:0x%" PRIx64, viewId); + } + + pIter = taosHashIterate(dbCache->viewCache, pIter); + } +} + + int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid, SCtgTbCache *pCache) { SSTableVersion metaRent = {.dbId = dbId, .suid = suid}; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index fd87caaa5d..de8bf7f954 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -2005,6 +2005,7 @@ void ctgGetGlobalCacheSize(uint64_t *pSize) { } int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx) { + SCatalog* pCtg = pTask->pJob->pCtg; int32_t dbNum = taosArrayGetSize(pCtx->pNames); for (int32_t i = 0; i < dbNum; ++i) { STablesReq* pReq = taosArrayGet(pCtx->pNames, i);