feat: update view meta

This commit is contained in:
dapan1121 2023-10-12 19:56:17 +08:00
parent f5768494b4
commit 7c67f23469
11 changed files with 154 additions and 30 deletions

View File

@ -107,6 +107,8 @@ enum {
HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_DBINFO,
HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_TMQ, HEARTBEAT_KEY_TMQ,
HEARTBEAT_KEY_DYN_VIEW,
HEARTBEAT_KEY_VIEWINFO,
}; };
typedef enum _mgmt_table { typedef enum _mgmt_table {

View File

@ -145,6 +145,11 @@ typedef struct SDbCacheInfo {
int64_t stateTs; int64_t stateTs;
} SDbCacheInfo; } SDbCacheInfo;
typedef struct SDynViewVersion {
int64_t svrBootTs;
uint64_t dynViewVer;
} SDynViewVersion;
typedef struct SViewVersion { typedef struct SViewVersion {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN]; char viewName[TSDB_VIEW_NAME_LEN];
@ -322,6 +327,8 @@ int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray*
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion** stables, uint32_t* num); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion** stables, uint32_t* num);
int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* num, SDynViewVersion** dynViewVersion);
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbCacheInfo** dbs, uint32_t* num); int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbCacheInfo** dbs, uint32_t* num);
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num); int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num);

View File

@ -740,8 +740,8 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
for (int32_t i = 0; i < stbNum; ++i) { for (int32_t i = 0; i < stbNum; ++i) {
SSTableVersion *stb = &stbs[i]; SSTableVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid); stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion); stb->sversion = htonl(stb->sversion);
stb->tversion = htons(stb->tversion); stb->tversion = htonl(stb->tversion);
stb->smaVer = htonl(stb->smaVer); stb->smaVer = htonl(stb->smaVer);
} }
@ -762,6 +762,55 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SViewVersion *views = NULL;
uint32_t viewNum = 0;
int32_t code = 0;
SDynViewVersion *pDynViewVer = NULL;
code = catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(views);
taosMemoryFree(pDynViewVer);
return code;
}
if (viewNum <= 0) {
taosMemoryFree(views);
taosMemoryFree(pDynViewVer);
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < viewNum; ++i) {
SViewVersion *view = &views[i];
view->viewId = htobe64(view->viewId);
view->version = htonl(view->version);
}
tscDebug("hb got %d expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
SKv kv = {
.key = HEARTBEAT_KEY_DYN_VIEW,
.valueLen = sizeof(SDynViewVersion),
.value = pDynViewVer,
};
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
kv.key = HEARTBEAT_KEY_VIEWINFO;
kv.valueLen = sizeof(SViewVersion) * viewNum;
kv.value = views;
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
}
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL != pApp) { if (NULL != pApp) {
@ -781,19 +830,17 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
SHbParam *hbParam = (SHbParam *)param; SHbParam *hbParam = (SHbParam *)param;
SCatalog *pCatalog = NULL; SCatalog *pCatalog = NULL;
hbGetQueryBasicInfo(connKey, req);
if (hbParam->reqCnt == 0) { if (hbParam->reqCnt == 0) {
code = catalogGetHandle(hbParam->clusterId, &pCatalog); code = catalogGetHandle(hbParam->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code; return code;
} }
}
hbGetAppInfo(hbParam->clusterId, req); hbGetAppInfo(hbParam->clusterId, req);
hbGetQueryBasicInfo(connKey, req);
if (hbParam->reqCnt == 0) {
if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) { if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
code = hbGetExpiredUserInfo(connKey, pCatalog, req); code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -819,6 +866,15 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
#ifdef TD_ENTERPRISE
code = hbGetExpiredViewInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
#endif
} else {
req->app.appId = 0;
} }
++hbParam->reqCnt; // success to get catalog info ++hbParam->reqCnt; // success to get catalog info

View File

@ -31,6 +31,9 @@ int32_t mndProcessGetViewMetaReq(SRpcMsg *pReq);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
void initDynViewVersion(void);
SViewObj *mndAcquireView(SMnode *pMnode, char *viewName); SViewObj *mndAcquireView(SMnode *pMnode, char *viewName);
void mndReleaseView(SMnode *pMnode, SViewObj *pView); void mndReleaseView(SMnode *pMnode, SViewObj *pView);
@ -45,6 +48,9 @@ int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq);
int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq); int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq);
int32_t mndRetrieveViewImpl(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); int32_t mndRetrieveViewImpl(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
int32_t mndDropViewByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropViewByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
void mndValidateDynViewVersion(SMnode *pMnode, SDynViewVersion* pDynViewVer, bool *needCheck);
int32_t mndValidateViewInfo(SMnode *pMnode, SViewVersion *pViewVersions, int32_t numOfViews, void **ppRsp,
int32_t *pRspLen);
#endif #endif

View File

@ -23,6 +23,7 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndView.h"
#include "tglobal.h" #include "tglobal.h"
#include "tversion.h" #include "tversion.h"
#include "audit.h" #include "audit.h"
@ -466,7 +467,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
SRpcConnInfo connInfo = pMsg->info.conn; SRpcConnInfo connInfo = pMsg->info.conn;
mndUpdateAppInfo(pMnode, pHbReq, &connInfo); if (0 != pHbReq->app.appId) {
mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
}
if (pHbReq->query) { if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query; SQueryHbReqBasic *pBasic = pHbReq->query;
@ -529,6 +532,21 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
return -1; return -1;
} }
#ifdef TD_ENTERPRISE
bool needCheck = true;
int32_t key = HEARTBEAT_KEY_DYN_VIEW;
SDynViewVersion* pDynViewVer = NULL;
SKv* pKv = taosHashGet(pHbReq->info, &key, sizeof(key));
if (NULL != pKv) {
pDynViewVer = pKv->value;
mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck);
if (needCheck) {
SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pDynViewVer};
taosArrayPush(hbRsp.info, &kv1);
}
}
#endif
void *pIter = taosHashIterate(pHbReq->info, NULL); void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SKv *kv = pIter; SKv *kv = pIter;
@ -564,6 +582,25 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
break; break;
} }
#ifdef TD_ENTERPRISE
case HEARTBEAT_KEY_DYN_VIEW: {
break;
}
case HEARTBEAT_KEY_VIEWINFO: {
if (!needCheck) {
break;
}
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_VIEWINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
#endif
default: default:
mError("invalid kv key:%d", kv->key); mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_APP_ERROR; hbRsp.status = TSDB_CODE_APP_ERROR;

View File

@ -25,6 +25,8 @@ int32_t mndInitView(SMnode *pMnode) {
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
initDynViewVersion();
SSdbTable table = { SSdbTable table = {
.sdbType = SDB_VIEW, .sdbType = SDB_VIEW,
.keyType = SDB_KEY_BINARY, .keyType = SDB_KEY_BINARY,

View File

@ -314,14 +314,15 @@ typedef struct SCtgUserAuth {
} SCtgUserAuth; } SCtgUserAuth;
typedef struct SCatalog { typedef struct SCatalog {
uint64_t clusterId; uint64_t clusterId;
bool stopUpdate; bool stopUpdate;
SHashObj* userCache; // key:user, value:SCtgUserAuth SDynViewVersion dynViewVer;
SHashObj* dbCache; // key:dbname, value:SCtgDBCache SHashObj* userCache; // key:user, value:SCtgUserAuth
SCtgRentMgmt dbRent; SHashObj* dbCache; // key:dbname, value:SCtgDBCache
SCtgRentMgmt stbRent; SCtgRentMgmt dbRent;
SCtgRentMgmt viewRent; SCtgRentMgmt stbRent;
SCtgCacheStat cacheStat; SCtgRentMgmt viewRent;
SCtgCacheStat cacheStat;
} SCatalog; } SCatalog;
typedef struct SCtgBatch { typedef struct SCtgBatch {
@ -1030,7 +1031,7 @@ void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock); void ctgFreeTbCacheImpl(SCtgTbCache* pCache, bool lock);
void ctgFreeViewCacheImpl(SCtgViewCache* pCache, bool lock); void ctgFreeViewCacheImpl(SCtgViewCache* pCache, bool lock);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgRemoveCacheUser(SCatalog* pCtg, const char* user); int32_t ctgRemoveCacheUser(SCatalog* pCtg, SCtgUserAuth* pUser, const char* user);
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup,
bool* exists); bool* exists);
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);

View File

@ -1492,6 +1492,23 @@ int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion** stables, uint3
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableVersion))); CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableVersion)));
} }
int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* num, SDynViewVersion** dynViewVersion) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == views || NULL == num) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
*dynViewVersion = taosMemoryMalloc(sizeof(SDynViewVersion));
if (NULL == *dynViewVersion) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
**dynViewVersion = pCtg->dynViewVer;
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->viewRent, (void**)views, num, sizeof(SViewVersion)));
}
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbCacheInfo** dbs, uint32_t* num) { int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbCacheInfo** dbs, uint32_t* num) {
CTG_API_ENTER(); CTG_API_ENTER();

View File

@ -2155,7 +2155,7 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (msg->userAuth.dropped == 1) { } else if (msg->userAuth.dropped == 1) {
if (ctgRemoveCacheUser(pCtg, msg->userAuth.user) == 0) { if (ctgRemoveCacheUser(pCtg, pUser, msg->userAuth.user) == 0) {
CTG_CACHE_NUM_DEC(CTG_CI_USER, 1); CTG_CACHE_NUM_DEC(CTG_CI_USER, 1);
} }
goto _return; goto _return;

View File

@ -338,17 +338,13 @@ void ctgFreeHandleImpl(SCatalog* pCtg) {
taosMemoryFree(pCtg); taosMemoryFree(pCtg);
} }
int32_t ctgRemoveCacheUser(SCatalog* pCtg, const char* user) { int32_t ctgRemoveCacheUser(SCatalog* pCtg, SCtgUserAuth* pUser, const char* user) {
if (!pCtg || !user) { CTG_LOCK(CTG_WRITE, &pUser->lock);
return -1; ctgFreeSCtgUserAuth(pUser);
} CTG_UNLOCK(CTG_WRITE, &pUser->lock);
SCtgUserAuth* pUser = (SCtgUserAuth*)taosHashGet(pCtg->userCache, user, strlen(user)); if (taosHashRemove(pCtg->userCache, user, strlen(user)) == 0) {
if (pUser) { return 0; // user found and removed
ctgFreeSCtgUserAuth(pUser);
if (taosHashRemove(pCtg->userCache, user, strlen(user)) == 0) {
return 0; // user found and removed
}
} }
return -1; return -1;

View File

@ -349,7 +349,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
if (pParCxt->async) { if (pParCxt->async) {
code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta); code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && couldBeView) {
int32_t origCode = code; int32_t origCode = code;
code = getViewMetaFromCache(pCxt->pMetaCache, pName, pMeta); code = getViewMetaFromCache(pCxt->pMetaCache, pName, pMeta);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {