diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3133e3b300..ee5b5c4cf5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1791,6 +1791,15 @@ int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp); int32_t tDeserializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp); void tFreeSSTbHbRsp(SSTbHbRsp* pRsp); +typedef struct { + SArray* pViewRsp; // Array of SViewMetaRsp; +} SViewHbRsp; + +int32_t tSerializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp); +int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp); +void tFreeSViewHbRsp(SViewHbRsp* pRsp); + + typedef struct { int32_t numOfTables; int32_t numOfVgroup; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 7877e0accf..e018ac0efd 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -119,6 +119,7 @@ typedef struct SMetaData { typedef struct SCatalogCfg { uint32_t maxTblCacheNum; + uint32_t maxViewCacheNum; uint32_t maxDBCacheNum; uint32_t maxUserCacheNum; uint32_t dbRentSec; @@ -365,7 +366,13 @@ SMetaData* catalogCloneMetaData(SMetaData* pData); void catalogFreeMetaData(SMetaData* pData); -int32_t catalogRemoveViewMeta(SCatalog* pCtg, SName* pViewName); +int32_t catalogRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* viewName, uint64_t viewId); + +int32_t catalogUpdateDynViewVer(SCatalog* pCtg, SDynViewVersion* pVer); + +int32_t catalogUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg); + +int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg); int32_t ctgdEnableDebug(char* option, bool enable); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 90c20b9d61..62c1505ff0 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -53,7 +53,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i); - tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version); + tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version); catalogUpdateUserAuthInfo(pCatalog, rsp); } @@ -205,6 +205,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid); if (rsp->useDbRsp->vgVersion < 0) { + tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db); code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid); } else { SDBVgInfo *vgInfo = NULL; @@ -213,6 +214,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog goto _return; } + tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db); + catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo); if (IS_SYS_DBNAME(rsp->useDbRsp->db)) { @@ -253,10 +256,10 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i); if (rsp->numOfColumns < 0) { - tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); + tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid); } else { - tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); + tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId); tFreeSSTbHbRsp(&hbRsp); @@ -281,6 +284,96 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } + +static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion*)value); +} + +static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t code = 0; + + SViewHbRsp hbRsp = {0}; + if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp); + for (int32_t i = 0; i < numOfMeta; ++i) { + SViewMetaRsp *rsp = taosArrayGet(hbRsp.pViewRsp, i); + + if (rsp->numOfCols < 0) { + tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name); + catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId); + } else { + tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name); + catalogUpdateViewMeta(pCatalog, rsp); + } + } + + tFreeSViewHbRsp(&hbRsp); + return TSDB_CODE_SUCCESS; +} + + +static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { + for (int32_t i = 0; i < kvNum; ++i) { + SKv *kv = taosArrayGet(pKvs, i); + switch (kv->key) { + case HEARTBEAT_KEY_USER_AUTHINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr); + break; + } + case HEARTBEAT_KEY_DBINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); + break; + } + case HEARTBEAT_KEY_STBINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); + break; + } +#ifdef TD_ENTERPRISE + case HEARTBEAT_KEY_DYN_VIEW: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog); + break; + } + case HEARTBEAT_KEY_VIEWINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog); + break; + } +#endif + default: + tscError("invalid hb key type:%d", kv->key); + break; + } + } +} + static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == pReq) { @@ -338,66 +431,16 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { tscDebug("hb got %d rsp kv", kvNum); - for (int32_t i = 0; i < kvNum; ++i) { - SKv *kv = taosArrayGet(pRsp->info, i); - switch (kv->key) { - case HEARTBEAT_KEY_USER_AUTHINFO: { - if (kv->valueLen <= 0 || NULL == kv->value) { - tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value); - break; - } - - struct SCatalog *pCatalog = NULL; - - int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); - break; - } - - hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr); - break; - } - case HEARTBEAT_KEY_DBINFO: { - if (kv->valueLen <= 0 || NULL == kv->value) { - tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); - break; - } - - struct SCatalog *pCatalog = NULL; - - int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); - break; - } - - hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); - break; - } - case HEARTBEAT_KEY_STBINFO: { - if (kv->valueLen <= 0 || NULL == kv->value) { - tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); - break; - } - - struct SCatalog *pCatalog = NULL; - - int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); - break; - } - - hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); - break; - } - default: - tscError("invalid hb key type:%d", kv->key); - break; + if (kvNum > 0) { + struct SCatalog *pCatalog = NULL; + int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); + } else { + hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr); } } - + taosHashRelease(pAppHbMgr->activeInfo, pReq); return TSDB_CODE_SUCCESS; @@ -783,6 +826,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S for (int32_t i = 0; i < viewNum; ++i) { SViewVersion *view = &views[i]; + view->dbId = htobe64(view->dbId); view->viewId = htobe64(view->viewId); view->version = htonl(view->version); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1028b6a3a3..36b3a31fef 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8621,24 +8621,31 @@ int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq) return 0; } +static int32_t tEncodeSViewMetaRsp(SEncoder *pEncoder, const SViewMetaRsp *pRsp) { + if (tEncodeCStr(pEncoder, pRsp->name) < 0) return -1; + if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1; + if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1; + if (tEncodeU64(pEncoder, pRsp->viewId) < 0) return -1; + if (tEncodeCStr(pEncoder, pRsp->querySql) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->type) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->version) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->numOfCols) < 0) return -1; + for (int32_t i = 0; i < pRsp->numOfCols; ++i) { + SSchema *pSchema = &pRsp->pSchema[i]; + if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; + } + + return 0; +} + + int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->name) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1; - if (tEncodeU64(&encoder, pRsp->dbId) < 0) return -1; - if (tEncodeU64(&encoder, pRsp->viewId) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->querySql) < 0) return -1; - if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1; - if (tEncodeI8(&encoder, pRsp->type) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->version) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->numOfCols) < 0) return -1; - for (int32_t i = 0; i < pRsp->numOfCols; ++i) { - SSchema *pSchema = &pRsp->pSchema[i]; - if (tEncodeSSchema(&encoder, pSchema) < 0) return -1; - } + if (tEncodeSViewMetaRsp(&encoder, pRsp) < 0) return -1; tEndEncode(&encoder); @@ -8647,20 +8654,16 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR return tlen; } -int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->name) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1; - if (tDecodeU64(&decoder, &pRsp->dbId) < 0) return -1; - if (tDecodeU64(&decoder, &pRsp->viewId) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pRsp->querySql) < 0) return -1; - if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1; - if (tDecodeI8(&decoder, &pRsp->type) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->numOfCols) < 0) return -1; +static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) { + if (tDecodeCStrTo(pDecoder, pRsp->name) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1; + if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1; + if (tDecodeU64(pDecoder, &pRsp->viewId) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pRsp->querySql) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->type) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->numOfCols) < 0) return -1; if (pRsp->numOfCols > 0) { pRsp->pSchema = taosMemoryCalloc(pRsp->numOfCols, sizeof(SSchema)); if (pRsp->pSchema == NULL) { @@ -8670,10 +8673,20 @@ int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) for (int32_t i = 0; i < pRsp->numOfCols; ++i) { SSchema* pSchema = pRsp->pSchema + i; - if (tDecodeSSchema(&decoder, pSchema) < 0) return -1; + if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1; } } + return 0; +} + +int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSViewMetaRsp(&decoder, pRsp) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -8689,4 +8702,62 @@ void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) { taosMemoryFree(pRsp->pSchema); } +int32_t tSerializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + int32_t numOfMeta = taosArrayGetSize(pRsp->pViewRsp); + if (tEncodeI32(&encoder, numOfMeta) < 0) return -1; + for (int32_t i = 0; i < numOfMeta; ++i) { + SViewMetaRsp *pMetaRsp = taosArrayGet(pRsp->pViewRsp, i); + if (tEncodeSViewMetaRsp(&encoder, pMetaRsp) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t numOfMeta = 0; + if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1; + pRsp->pViewRsp = taosArrayInit(numOfMeta, sizeof(SViewMetaRsp)); + if (pRsp->pViewRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < numOfMeta; ++i) { + SViewMetaRsp metaRsp = {0}; + if (tDecodeSViewMetaRsp(&decoder, &metaRsp) < 0) return -1; + taosArrayPush(pRsp->pViewRsp, &metaRsp); + } + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +void tFreeSViewHbRsp(SViewHbRsp *pRsp) { + int32_t numOfMeta = taosArrayGetSize(pRsp->pViewRsp); + for (int32_t i = 0; i < numOfMeta; ++i) { + SViewMetaRsp *pMetaRsp = taosArrayGet(pRsp->pViewRsp, i); + tFreeSViewMetaRsp(pMetaRsp); + } + + taosArrayDestroy(pRsp->pViewRsp); +} + + + diff --git a/source/dnode/mnode/impl/inc/mndView.h b/source/dnode/mnode/impl/inc/mndView.h index 4ca209fbd6..7ff5f0a763 100755 --- a/source/dnode/mnode/impl/inc/mndView.h +++ b/source/dnode/mnode/impl/inc/mndView.h @@ -48,7 +48,7 @@ int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq); int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq); int32_t mndRetrieveViewImpl(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); int32_t mndDropViewByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -void mndValidateDynViewVersion(SMnode *pMnode, SDynViewVersion* pDynViewVer, bool *needCheck); +int32_t mndValidateDynViewVersion(SMnode *pMnode, SDynViewVersion* pReqVer, bool *needCheck, SDynViewVersion** ppRspVer); int32_t mndValidateViewInfo(SMnode *pMnode, SViewVersion *pViewVersions, int32_t numOfViews, void **ppRsp, int32_t *pRspLen); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6012da077a..118e090b78 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -539,10 +539,17 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb SKv* pKv = taosHashGet(pHbReq->info, &key, sizeof(key)); if (NULL != pKv) { pDynViewVer = pKv->value; - mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck); + mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer); + + SDynViewVersion* pRspVer = NULL; + if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) { + return -1; + } + if (needCheck) { - SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pDynViewVer}; + SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer}; taosArrayPush(hbRsp.info, &kv1); + mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs, pRspVer->dynViewVer); } } #endif diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c47c4994b7..e446f16f3c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2000,8 +2000,51 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, return 0; } -static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, - int32_t *smaVer) { +static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion* pStbVer, bool* schema, bool* sma) { + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName); + + SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + if (pDb->uid != pStbVer->dbId) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + SStbObj *pStb = mndAcquireStb(pMnode, tbFName); + if (pStb == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; + return -1; + } + + taosRLockLatch(&pStb->lock); + + if (pStbVer->sversion != pStb->colVer || pStbVer->tversion != pStb->tagVer) { + *schema = true; + } else { + *schema = false; + } + + if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) { + *sma = true; + } else { + *sma = false; + } + + taosRUnLockLatch(&pStb->lock); + + mndReleaseDb(pMnode, pDb); + mndReleaseStb(pMnode, pStb); + return TSDB_CODE_SUCCESS; +} + +static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); @@ -2018,10 +2061,6 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char return -1; } - if (smaVer) { - *smaVer = pStb->smaVer; - } - int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp); mndReleaseDb(pMnode, pDb); mndReleaseStb(pMnode, pStb); @@ -2664,7 +2703,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { } } else { mInfo("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); - if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp, NULL) != 0) { + if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { goto _OVER; } } @@ -2775,14 +2814,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t for (int32_t i = 0; i < numOfStbs; ++i) { SSTableVersion *pStbVersion = &pStbVersions[i]; pStbVersion->suid = be64toh(pStbVersion->suid); - pStbVersion->sversion = ntohs(pStbVersion->sversion); - pStbVersion->tversion = ntohs(pStbVersion->tversion); + pStbVersion->sversion = ntohl(pStbVersion->sversion); + pStbVersion->tversion = ntohl(pStbVersion->tversion); pStbVersion->smaVer = ntohl(pStbVersion->smaVer); - STableMetaRsp metaRsp = {0}; - int32_t smaVer = 0; - mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName); - if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) { + bool schema = false; + bool sma = false; + int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma); + if (TSDB_CODE_SUCCESS != code) { + STableMetaRsp metaRsp = {0}; metaRsp.numOfColumns = -1; metaRsp.suid = pStbVersion->suid; tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName)); @@ -2792,13 +2832,23 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t continue; } - if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) { + if (schema) { + STableMetaRsp metaRsp = {0}; + mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName); + if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) { + metaRsp.numOfColumns = -1; + metaRsp.suid = pStbVersion->suid; + tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName)); + tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName)); + tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName)); + taosArrayPush(hbRsp.pMetaRsp, &metaRsp); + continue; + } + taosArrayPush(hbRsp.pMetaRsp, &metaRsp); - } else { - tFreeSTableMetaRsp(&metaRsp); } - if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) { + if (sma) { bool exist = false; char tbFName[TSDB_TABLE_FNAME_LEN]; STableIndexRsp indexRsp = {0}; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 439e052ffc..2fe39eb533 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -30,6 +30,7 @@ extern "C" { #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_DB_NUMBER 20 #define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000 +#define CTG_DEFAULT_CACHE_VIEW_NUMBER 256 #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_MAX_RETRY_TIMES 3 @@ -542,6 +543,7 @@ typedef struct SCtgDropViewMetaMsg { char dbFName[TSDB_DB_FNAME_LEN]; char viewName[TSDB_VIEW_NAME_LEN]; uint64_t dbId; + uint64_t viewId; } SCtgDropViewMetaMsg; @@ -928,7 +930,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncOp); -int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *viewName, bool syncOp); +int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *viewName, uint64_t viewId, bool syncOp); int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp); int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size); int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index a21089fa2d..b8888acde9 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -666,10 +666,10 @@ _return: } -int32_t ctgRemoveViewMeta(SCatalog* pCtg, SName* pViewName) { +int32_t ctgRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* viewName, uint64_t viewId) { int32_t code = 0; - if (NULL == pCtg || NULL == pViewName) { + if (NULL == pCtg || NULL == dbFName || NULL == viewName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -677,9 +677,7 @@ int32_t ctgRemoveViewMeta(SCatalog* pCtg, SName* pViewName) { return TSDB_CODE_SUCCESS; } - char dbFName[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pViewName, dbFName); - CTG_ERR_JRET(ctgDropViewMetaEnqueue(pCtg, dbFName, 0, pViewName->tname, true)); + CTG_ERR_JRET(ctgDropViewMetaEnqueue(pCtg, dbFName, 0, viewName, viewId, true)); _return: @@ -757,6 +755,10 @@ int32_t catalogInit(SCatalogCfg* cfg) { gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; } + if (gCtgMgmt.cfg.maxViewCacheNum == 0) { + gCtgMgmt.cfg.maxViewCacheNum = CTG_DEFAULT_CACHE_VIEW_NUMBER; + } + if (gCtgMgmt.cfg.dbRentSec == 0) { gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; } @@ -771,6 +773,7 @@ int32_t catalogInit(SCatalogCfg* cfg) { } else { gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; + gCtgMgmt.cfg.maxViewCacheNum = CTG_DEFAULT_CACHE_VIEW_NUMBER; gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; gCtgMgmt.cfg.viewRentSec = CTG_DEFAULT_RENT_SECOND; @@ -1503,7 +1506,9 @@ int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* n if (NULL == *dynViewVersion) { CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); } - **dynViewVersion = pCtg->dynViewVer; + + (*dynViewVersion)->svrBootTs = atomic_load_64(&pCtg->dynViewVer.svrBootTs); + (*dynViewVersion)->dynViewVer = atomic_load_64(&pCtg->dynViewVer.dynViewVer); CTG_API_LEAVE(ctgMetaRentGet(&pCtg->viewRent, (void**)views, num, sizeof(SViewVersion))); } @@ -1703,10 +1708,58 @@ void catalogFreeMetaData(SMetaData * pData) { } -int32_t catalogRemoveViewMeta(SCatalog* pCtg, SName* pViewName) { +int32_t catalogRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* viewName, uint64_t viewId) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgRemoveViewMeta(pCtg, pViewName)); + CTG_API_LEAVE(ctgRemoveViewMeta(pCtg, dbFName, dbId, viewName, viewId)); +} + + +int32_t catalogUpdateDynViewVer(SCatalog* pCtg, SDynViewVersion* pVer) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pVer) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + atomic_store_64(&pCtg->dynViewVer.svrBootTs, pVer->svrBootTs); + atomic_store_64(&pCtg->dynViewVer.dynViewVer, pVer->dynViewVer); + + ctgDebug("cluster %" PRIx64 " svrBootTs updated to %" PRId64, pCtg->clusterId, pVer->svrBootTs); + ctgDebug("cluster %" PRIx64 " dynViewVer updated to %" PRId64, pCtg->clusterId, pVer->dynViewVer); + + CTG_API_LEAVE(TSDB_CODE_SUCCESS); +} + +int32_t catalogUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pMsg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgUpdateViewMetaToCache(pCtg, pMsg, true)); + +_return: + + CTG_API_LEAVE(code); +} + + +int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pMsg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgUpdateViewMetaToCache(pCtg, pMsg, false)); + +_return: + + CTG_API_LEAVE(code); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 8cca214de7..30d434b629 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -31,7 +31,7 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex}, {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex}, {CTG_OP_UPDATE_VIEW_META, "update viewMeta", ctgOpUpdateViewMeta}, - {CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropTbMeta}, + {CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropViewMeta}, {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { @@ -1310,7 +1310,7 @@ int32_t ctgUpdateViewMetaEnqueue(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncOp return TSDB_CODE_SUCCESS; } -int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *viewName, bool syncOp) { +int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *viewName, uint64_t viewId, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_DROP_VIEW_META; @@ -1327,6 +1327,7 @@ int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); tstrncpy(msg->viewName, viewName, sizeof(msg->viewName)); msg->dbId = dbId; + msg->viewId = viewId; op->data = msg; @@ -1349,7 +1350,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == newDBCache.tbCache) { - ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum); + ctgError("taosHashInit %d tbCache failed", gCtgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -1360,6 +1361,13 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + newDBCache.viewCache = taosHashInit(gCtgMgmt.cfg.maxViewCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), + true, HASH_ENTRY_LOCK); + if (NULL == newDBCache.viewCache) { + ctgError("taosHashInit %d viewCache failed", gCtgMgmt.cfg.maxViewCacheNum); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache)); if (code) { if (HASH_NODE_EXIST(code)) { @@ -1635,7 +1643,6 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN CTG_DB_NUM_INC(CTG_CI_VIEW); - pMeta = NULL; ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version); CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, &cache)); @@ -1656,12 +1663,12 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pMeta)); - pMeta = NULL; - ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version); CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, pCache)); + pMeta = NULL; + _return: if (pMeta) { @@ -2028,7 +2035,7 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { goto _return; } - if (msg->dbId && (dbCache->dbId != msg->dbId)) { + if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) { ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid); goto _return; @@ -2091,7 +2098,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { goto _return; } - if (dbCache->dbId != msg->dbId) { + if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) { ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName); goto _return; @@ -2370,7 +2377,7 @@ int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) { goto _return; } - if (0 != msg->dbId && dbCache->dbId != msg->dbId) { + if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) { ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, viewName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->viewName); goto _return; @@ -2383,6 +2390,11 @@ int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) { } int64_t viewId = pViewCache->pMeta->viewId; + if (0 != msg->viewId && viewId != msg->viewId) { + ctgDebug("viewId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", viewName:%s", msg->viewId, viewId, msg->viewName); + goto _return; + } + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pViewCache->pMeta)); ctgFreeViewCacheImpl(pViewCache, true); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index aad8a6f7c0..f6dc902928 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -261,12 +261,25 @@ void ctgFreeViewCacheImpl(SCtgViewCache* pCache, bool lock) { } } +void ctgFreeViewCache(SCtgDBCache* dbCache) { + if (NULL == dbCache->viewCache) { + return; + } + + SCtgViewCache* pCache = taosHashIterate(dbCache->viewCache, NULL); + while (NULL != pCache) { + ctgFreeViewCacheImpl(pCache, false); + pCache = taosHashIterate(dbCache->viewCache, pCache); + } + taosHashCleanup(dbCache->viewCache); + dbCache->viewCache = NULL; +} + void ctgFreeTbCache(SCtgDBCache* dbCache) { if (NULL == dbCache->tbCache) { return; } - int32_t tblNum = taosHashGetSize(dbCache->tbCache); SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL); while (NULL != pCache) { ctgFreeTbCacheImpl(pCache, false); @@ -288,6 +301,7 @@ void ctgFreeDbCache(SCtgDBCache* dbCache) { ctgFreeCfgInfoCache(dbCache); ctgFreeStbMetaCache(dbCache); ctgFreeTbCache(dbCache); + ctgFreeViewCache(dbCache); } void ctgFreeInstDbCache(SHashObj* pDbCache) { diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index fea7860218..8b2d2d028f 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -383,7 +383,9 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); #ifdef TD_ENTERPRISE if (TSDB_CODE_SUCCESS == code) { - code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, &name); + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&name, dbFName); + code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, dbFName, 0, pStmt->tableName, 0); } #endif if (TSDB_CODE_SUCCESS == code) { @@ -626,7 +628,9 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; strcpy(name.dbname, pStmt->dbName); strcpy(name.tname, pStmt->viewName); - int32_t code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, &name); + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&name, dbFName); + int32_t code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, dbFName, 0, pStmt->viewName, 0); if (TSDB_CODE_SUCCESS == code) { code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_READ, pCxt->pMetaCache);