feat: mnode processing view meta request

This commit is contained in:
dapan1121 2023-10-07 10:40:00 +08:00
parent 0a0f512023
commit deb9a6875b
7 changed files with 91 additions and 30 deletions

View File

@ -27,6 +27,8 @@ void mndCleanupView(SMnode *pMnode);
int32_t mndProcessCreateViewReq(SRpcMsg *pReq);
int32_t mndProcessDropViewReq(SRpcMsg *pReq);
int32_t mndProcessGetViewMetaReq(SRpcMsg *pReq);
#ifdef TD_ENTERPRISE
SViewObj *mndAcquireView(SMnode *pMnode, char *viewName);
@ -40,6 +42,8 @@ int32_t mndViewActionUpdate(SSdb *pSdb, SViewObj *pOldView, SViewObj *pNewView);
int32_t mndProcessCreateViewReqImpl(SCMCreateViewReq* pCreateView, SRpcMsg *pReq);
int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq);
int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq);
#endif

View File

@ -77,7 +77,7 @@ int32_t mndProcessDropViewReq(SRpcMsg *pReq) {
#endif
}
static int32_t mndProcessViewMetaReq(SRpcMsg *pReq) {
int32_t mndProcessGetViewMetaReq(SRpcMsg *pReq) {
#ifndef TD_ENTERPRISE
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
@ -85,10 +85,10 @@ static int32_t mndProcessViewMetaReq(SRpcMsg *pReq) {
if (tDeserializeSViewMetaReq(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
return -1;
}
return mndProcessTableMetaReqImpl(&req, pReq);
return mndProcessViewMetaReqImpl(&req, pReq);
#endif
}

View File

@ -921,8 +921,18 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncO
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size);
int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size);
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
__compar_fn_t searchCompare);
int32_t ctgMetaRentGet(SCtgRentMgmt* mgmt, void** res, uint32_t* num, int32_t size);
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare);
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgRemoveViewRent(SCatalog *pCtg, SCtgDBCache *dbCache);
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
SCtgTbCache *pCache);
int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, uint64_t dbId, uint64_t viewId,
SCtgViewCache *pCache);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask);
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache* dbCache);
@ -931,9 +941,11 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
int32_t ctgOpDropTbIndex(SCtgCacheOperation* operation);
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation* operation);
int32_t ctgOpClearCache(SCtgCacheOperation* operation);
int32_t ctgOpUpdateViewMeta(SCtgCacheOperation *operation);
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char* tableName, int32_t* tbType);
int32_t ctgGetTbHashVgroupFromCache(SCatalog* pCtg, const SName* pTableName, SVgroupInfo** pVgroup);
int32_t ctgGetViewsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgViewsCtx *ctx, int32_t dbIdx,
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList);
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
SCtgTaskReq* tReq);
@ -959,6 +971,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask);
int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pName, SViewMetaOutput* out,
SCtgTaskReq* tReq);
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
@ -985,9 +999,11 @@ int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFNam
void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache* dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
int32_t ctgViewVersionSortCompare(const void* key1, const void* key2);
int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2);
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2);
int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target);
@ -1019,8 +1035,10 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache);
void ctgGetGlobalCacheStat(SCtgCacheStat* pStat);
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx);
void ctgGetGlobalCacheSize(uint64_t *pSize);
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex);
uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta);
uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta);
uint64_t ctgGetDbVgroupCacheSize(SDBVgInfo *pVg);
uint64_t ctgGetUserCacheSize(SGetUserAuthRsp *pAuth);

View File

@ -20,6 +20,30 @@
#include "tref.h"
#include "trpc.h"
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
SCtgTask* pTask = NULL;
*done = true;
CTG_LOCK(CTG_READ, &pJob->taskLock);
int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) {
pTask = taosArrayGet(pJob->pTasks, i);
if (type != pTask->type) {
continue;
}
if (pTask->status != CTG_TASK_DONE) {
*done = false;
break;
}
}
CTG_UNLOCK(CTG_READ, &pJob->taskLock);
}
int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTbMetaParam* pParam = (SCtgTbMetaParam*)param;
SName* name = pParam->pName;
@ -1029,6 +1053,20 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDumpViewsRes(SCtgTask* pTask) {
if (pTask->subTask) {
return TSDB_CODE_SUCCESS;
}
SCtgJob* pJob = pTask->pJob;
pJob->jobRes.pView = pTask->res;
return TSDB_CODE_SUCCESS;
}
int32_t ctgCallSubCb(SCtgTask* pTask) {
int32_t code = 0;
@ -1816,7 +1854,7 @@ int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
SViewMetaRsp* pRsp = (SViewMetaRsp**)pMsgCtx->out;
SViewMetaRsp* pRsp = *(SViewMetaRsp**)pMsgCtx->out;
SViewMeta* pViewMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pViewMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
@ -1827,6 +1865,8 @@ int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
taosMemoryFree(pViewMeta);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pViewMeta->version = pRsp->version;
pViewMeta->viewId = pRsp->viewId;
ctgUpdateViewMetaToCache(pCtg, pRsp, false);
@ -2682,29 +2722,6 @@ _return:
CTG_RET(code);
}
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
SCtgTask* pTask = NULL;
*done = true;
CTG_LOCK(CTG_READ, &pJob->taskLock);
int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) {
pTask = taosArrayGet(pJob->pTasks, i);
if (type != pTask->type) {
continue;
}
if (pTask->status != CTG_TASK_DONE) {
*done = false;
break;
}
}
CTG_UNLOCK(CTG_READ, &pJob->taskLock);
}
SCtgTask* ctgGetTask(SCtgJob* pJob, int32_t taskId) {
int32_t taskNum = taosArrayGetSize(pJob->pTasks);

View File

@ -1370,6 +1370,7 @@ int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *d
atomic_store_8(&dbCache->deleted, 1);
ctgRemoveStbRent(pCtg, dbCache);
ctgRemoveViewRent(pCtg, dbCache);
ctgFreeDbCache(dbCache);
CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);
@ -1582,6 +1583,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
}
int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *viewName, SViewMeta *pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == dbCache->viewCache) {
ctgWarn("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED);
@ -1612,7 +1614,7 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN
CTG_LOCK(CTG_WRITE, &pCache->viewLock);
if (pCache->pMeta) {
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pMeta));
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pCache->pMeta));
taosMemoryFree(pCache->pMeta->querySql);
taosMemoryFree(pCache->pMeta);
}
@ -1620,7 +1622,7 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN
pCache->pMeta = pMeta;
CTG_UNLOCK(CTG_WRITE, &pCache->viewLock);
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pMeta));
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pMeta));
pMeta = NULL;

View File

@ -241,6 +241,25 @@ void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
}
}
void ctgRemoveViewRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void *pIter = taosHashIterate(dbCache->viewCache, NULL);
while (pIter) {
uint64_t viewId = ((SCtgViewCache*)pIter)->pMeta->viewId;
if (TSDB_CODE_SUCCESS ==
ctgMetaRentRemove(&pCtg->viewRent, viewId, ctgViewVersionSortCompare, ctgViewVersionSearchCompare)) {
ctgDebug("view removed from rent, viewId:0x%" PRIx64, viewId);
}
pIter = taosHashIterate(dbCache->viewCache, pIter);
}
}
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
SCtgTbCache *pCache) {
SSTableVersion metaRent = {.dbId = dbId, .suid = suid};

View File

@ -2005,6 +2005,7 @@ void ctgGetGlobalCacheSize(uint64_t *pSize) {
}
int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx) {
SCatalog* pCtg = pTask->pJob->pCtg;
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);