diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 34e9c1014b..3133e3b300 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -107,6 +107,8 @@ enum { HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_TMQ, + HEARTBEAT_KEY_DYN_VIEW, + HEARTBEAT_KEY_VIEWINFO, }; typedef enum _mgmt_table { diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index a36f5e9e3b..7877e0accf 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -145,6 +145,11 @@ typedef struct SDbCacheInfo { int64_t stateTs; } SDbCacheInfo; +typedef struct SDynViewVersion { + int64_t svrBootTs; + uint64_t dynViewVer; +} SDynViewVersion; + typedef struct SViewVersion { char dbFName[TSDB_DB_FNAME_LEN]; char viewName[TSDB_VIEW_NAME_LEN]; @@ -322,6 +327,8 @@ int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion** stables, uint32_t* num); +int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* num, SDynViewVersion** dynViewVersion); + int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbCacheInfo** dbs, uint32_t* num); int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index dfcca2f36e..90c20b9d61 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -740,8 +740,8 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC for (int32_t i = 0; i < stbNum; ++i) { SSTableVersion *stb = &stbs[i]; stb->suid = htobe64(stb->suid); - stb->sversion = htons(stb->sversion); - stb->tversion = htons(stb->tversion); + stb->sversion = htonl(stb->sversion); + stb->tversion = htonl(stb->tversion); stb->smaVer = htonl(stb->smaVer); } @@ -762,6 +762,55 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC return TSDB_CODE_SUCCESS; } +int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { + SViewVersion *views = NULL; + uint32_t viewNum = 0; + int32_t code = 0; + SDynViewVersion *pDynViewVer = NULL; + + code = catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(views); + taosMemoryFree(pDynViewVer); + return code; + } + + if (viewNum <= 0) { + taosMemoryFree(views); + taosMemoryFree(pDynViewVer); + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < viewNum; ++i) { + SViewVersion *view = &views[i]; + view->viewId = htobe64(view->viewId); + view->version = htonl(view->version); + } + + tscDebug("hb got %d expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum); + + if (NULL == req->info) { + req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + } + + SKv kv = { + .key = HEARTBEAT_KEY_DYN_VIEW, + .valueLen = sizeof(SDynViewVersion), + .value = pDynViewVer, + }; + + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + kv.key = HEARTBEAT_KEY_VIEWINFO; + kv.valueLen = sizeof(SViewVersion) * viewNum; + kv.value = views; + + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + return TSDB_CODE_SUCCESS; +} + + int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL != pApp) { @@ -781,19 +830,17 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req SHbParam *hbParam = (SHbParam *)param; SCatalog *pCatalog = NULL; + hbGetQueryBasicInfo(connKey, req); + if (hbParam->reqCnt == 0) { code = catalogGetHandle(hbParam->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } - } - hbGetAppInfo(hbParam->clusterId, req); + hbGetAppInfo(hbParam->clusterId, req); - hbGetQueryBasicInfo(connKey, req); - - if (hbParam->reqCnt == 0) { if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) { code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { @@ -819,6 +866,15 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req if (TSDB_CODE_SUCCESS != code) { return code; } + +#ifdef TD_ENTERPRISE + code = hbGetExpiredViewInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } +#endif + } else { + req->app.appId = 0; } ++hbParam->reqCnt; // success to get catalog info diff --git a/source/dnode/mnode/impl/inc/mndView.h b/source/dnode/mnode/impl/inc/mndView.h index dde287ac14..4ca209fbd6 100755 --- a/source/dnode/mnode/impl/inc/mndView.h +++ b/source/dnode/mnode/impl/inc/mndView.h @@ -31,6 +31,9 @@ int32_t mndProcessGetViewMetaReq(SRpcMsg *pReq); #ifdef TD_ENTERPRISE + +void initDynViewVersion(void); + SViewObj *mndAcquireView(SMnode *pMnode, char *viewName); void mndReleaseView(SMnode *pMnode, SViewObj *pView); @@ -45,6 +48,9 @@ int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq); int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq); int32_t mndRetrieveViewImpl(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); int32_t mndDropViewByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +void mndValidateDynViewVersion(SMnode *pMnode, SDynViewVersion* pDynViewVer, bool *needCheck); +int32_t mndValidateViewInfo(SMnode *pMnode, SViewVersion *pViewVersions, int32_t numOfViews, void **ppRsp, + int32_t *pRspLen); #endif diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6f67778615..6012da077a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -23,6 +23,7 @@ #include "mndShow.h" #include "mndStb.h" #include "mndUser.h" +#include "mndView.h" #include "tglobal.h" #include "tversion.h" #include "audit.h" @@ -466,7 +467,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SRpcConnInfo connInfo = pMsg->info.conn; - mndUpdateAppInfo(pMnode, pHbReq, &connInfo); + if (0 != pHbReq->app.appId) { + mndUpdateAppInfo(pMnode, pHbReq, &connInfo); + } if (pHbReq->query) { SQueryHbReqBasic *pBasic = pHbReq->query; @@ -529,6 +532,21 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb return -1; } +#ifdef TD_ENTERPRISE + bool needCheck = true; + int32_t key = HEARTBEAT_KEY_DYN_VIEW; + SDynViewVersion* pDynViewVer = NULL; + SKv* pKv = taosHashGet(pHbReq->info, &key, sizeof(key)); + if (NULL != pKv) { + pDynViewVer = pKv->value; + mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck); + if (needCheck) { + SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pDynViewVer}; + taosArrayPush(hbRsp.info, &kv1); + } + } +#endif + void *pIter = taosHashIterate(pHbReq->info, NULL); while (pIter != NULL) { SKv *kv = pIter; @@ -564,6 +582,25 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } break; } +#ifdef TD_ENTERPRISE + case HEARTBEAT_KEY_DYN_VIEW: { + break; + } + case HEARTBEAT_KEY_VIEWINFO: { + if (!needCheck) { + break; + } + + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_VIEWINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } +#endif default: mError("invalid kv key:%d", kv->key); hbRsp.status = TSDB_CODE_APP_ERROR; diff --git a/source/dnode/mnode/impl/src/mndView.c b/source/dnode/mnode/impl/src/mndView.c index bd147d90a3..d53e740736 100755 --- a/source/dnode/mnode/impl/src/mndView.c +++ b/source/dnode/mnode/impl/src/mndView.c @@ -25,6 +25,8 @@ int32_t mndInitView(SMnode *pMnode) { mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView); #ifdef TD_ENTERPRISE + initDynViewVersion(); + SSdbTable table = { .sdbType = SDB_VIEW, .keyType = SDB_KEY_BINARY, diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index b5dcc0ba84..439e052ffc 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -314,14 +314,15 @@ typedef struct SCtgUserAuth { } SCtgUserAuth; typedef struct SCatalog { - uint64_t clusterId; - bool stopUpdate; - SHashObj* userCache; // key:user, value:SCtgUserAuth - SHashObj* dbCache; // key:dbname, value:SCtgDBCache - SCtgRentMgmt dbRent; - SCtgRentMgmt stbRent; - SCtgRentMgmt viewRent; - SCtgCacheStat cacheStat; + uint64_t clusterId; + bool stopUpdate; + SDynViewVersion dynViewVer; + SHashObj* userCache; // key:user, value:SCtgUserAuth + SHashObj* dbCache; // key:dbname, value:SCtgDBCache + SCtgRentMgmt dbRent; + SCtgRentMgmt stbRent; + SCtgRentMgmt viewRent; + SCtgCacheStat cacheStat; } SCatalog; typedef struct SCtgBatch { @@ -1030,7 +1031,7 @@ void ctgClearHandle(SCatalog* pCtg); void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock); void ctgFreeViewCacheImpl(SCtgViewCache* pCache, bool lock); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); -int32_t ctgRemoveCacheUser(SCatalog* pCtg, const char* user); +int32_t ctgRemoveCacheUser(SCatalog* pCtg, SCtgUserAuth* pUser, const char* user); int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index bf5078c854..a21089fa2d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1492,6 +1492,23 @@ int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion** stables, uint3 CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableVersion))); } +int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* num, SDynViewVersion** dynViewVersion) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == views || NULL == num) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + *dynViewVersion = taosMemoryMalloc(sizeof(SDynViewVersion)); + if (NULL == *dynViewVersion) { + CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); + } + **dynViewVersion = pCtg->dynViewVer; + + CTG_API_LEAVE(ctgMetaRentGet(&pCtg->viewRent, (void**)views, num, sizeof(SViewVersion))); +} + + int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbCacheInfo** dbs, uint32_t* num) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index f6fab21816..8cca214de7 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2155,7 +2155,7 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { return TSDB_CODE_SUCCESS; } else if (msg->userAuth.dropped == 1) { - if (ctgRemoveCacheUser(pCtg, msg->userAuth.user) == 0) { + if (ctgRemoveCacheUser(pCtg, pUser, msg->userAuth.user) == 0) { CTG_CACHE_NUM_DEC(CTG_CI_USER, 1); } goto _return; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index a8948b6eaf..aad8a6f7c0 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -338,17 +338,13 @@ void ctgFreeHandleImpl(SCatalog* pCtg) { taosMemoryFree(pCtg); } -int32_t ctgRemoveCacheUser(SCatalog* pCtg, const char* user) { - if (!pCtg || !user) { - return -1; - } - - SCtgUserAuth* pUser = (SCtgUserAuth*)taosHashGet(pCtg->userCache, user, strlen(user)); - if (pUser) { - ctgFreeSCtgUserAuth(pUser); - if (taosHashRemove(pCtg->userCache, user, strlen(user)) == 0) { - return 0; // user found and removed - } +int32_t ctgRemoveCacheUser(SCatalog* pCtg, SCtgUserAuth* pUser, const char* user) { + CTG_LOCK(CTG_WRITE, &pUser->lock); + ctgFreeSCtgUserAuth(pUser); + CTG_UNLOCK(CTG_WRITE, &pUser->lock); + + if (taosHashRemove(pCtg->userCache, user, strlen(user)) == 0) { + return 0; // user found and removed } return -1; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 21b924f056..d7d1ee0de6 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -349,7 +349,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa if (pParCxt->async) { code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta); #ifdef TD_ENTERPRISE - if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { + if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && couldBeView) { int32_t origCode = code; code = getViewMetaFromCache(pCxt->pMetaCache, pName, pMeta); if (TSDB_CODE_SUCCESS != code) {