From 0a0f5120238afc2297b30eb707ae8ca3c8367070 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 28 Sep 2023 11:44:38 +0800 Subject: [PATCH] feat: add view meta processing --- include/common/tmsg.h | 20 +- include/common/tmsgdef.h | 1 + include/libs/catalog/catalog.h | 15 +- include/libs/qcom/query.h | 15 + include/util/taoserror.h | 1 + source/common/src/tmsg.c | 71 +++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/inc/mndDef.h | 14 +- source/dnode/mnode/impl/src/mndView.c | 17 + source/libs/catalog/inc/catalogInt.h | 33 +- source/libs/catalog/src/catalog.c | 6 + source/libs/catalog/src/ctgAsync.c | 170 +++++++- source/libs/catalog/src/ctgCache.c | 458 ++++++++++---------- source/libs/catalog/src/ctgDbg.c | 2 + source/libs/catalog/src/ctgRemote.c | 72 +++ source/libs/catalog/src/ctgRent.c | 287 ++++++++++++ source/libs/catalog/src/ctgUtil.c | 63 +++ source/libs/parser/src/parAstParser.c | 13 +- source/libs/parser/src/parTranslater.c | 7 + source/libs/parser/src/parUtil.c | 12 +- source/libs/qcom/src/querymsg.c | 39 ++ source/util/src/terror.c | 1 + 22 files changed, 1054 insertions(+), 264 deletions(-) create mode 100755 source/libs/catalog/src/ctgRent.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 60d53b8311..87ae63ced2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3912,7 +3912,6 @@ typedef struct { int8_t precision; int32_t numOfCols; SSchema* pSchema; - SRWLatch lock; } SCMCreateViewReq; int32_t tSerializeSCMCreateViewReq(void* buf, int32_t bufLen, const SCMCreateViewReq* pReq); @@ -3931,6 +3930,25 @@ int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pReq); void tFreeSCMDropViewReq(SCMDropViewReq* pReq); +typedef struct { + char fullname[TSDB_VIEW_FNAME_LEN]; +} SViewMetaReq; +int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq); +int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq); + +typedef struct { + char name[TSDB_VIEW_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + uint64_t dbId; + uint64_t viewId; + char* querySql; + int32_t version; +} SViewMetaRsp; +int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp); +int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp); +void tFreeSViewMetaRsp(SViewMetaRsp* pRsp); + + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2bdc998c95..4ba8531581 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -179,6 +179,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 8f5679c846..ea1cb02aad 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -39,6 +39,7 @@ enum { CTG_DBG_STB_NUM, CTG_DBG_DB_RENT_NUM, CTG_DBG_STB_RENT_NUM, + CTG_DBG_VIEW_RENT_NUM, }; typedef enum { @@ -111,7 +112,7 @@ typedef struct SMetaData { SArray* pTableCfg; // pRes = STableCfg* SArray* pTableTag; // pRes = SArray* SArray* pDnodeList; // pRes = SArray* - SArray* pView; // pRes = SViewInfo* + SArray* pView; // pRes = SViewMeta* SMetaRes* pSvrVer; // pRes = char* } SMetaData; @@ -121,12 +122,13 @@ typedef struct SCatalogCfg { uint32_t maxUserCacheNum; uint32_t dbRentSec; uint32_t stbRentSec; + uint32_t viewRentSec; } SCatalogCfg; typedef struct SSTableVersion { char dbFName[TSDB_DB_FNAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; - uint64_t dbId; + int64_t dbId; uint64_t suid; int32_t sversion; int32_t tversion; @@ -142,6 +144,15 @@ typedef struct SDbCacheInfo { int64_t stateTs; } SDbCacheInfo; +typedef struct SViewVersion { + char dbFName[TSDB_DB_FNAME_LEN]; + char viewName[TSDB_VIEW_NAME_LEN]; + int64_t dbId; + uint64_t viewId; + int32_t version; +} SViewVersion; + + typedef struct STbSVersion { char* tbFName; int32_t sver; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 71b8badb13..b776893e28 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -119,6 +119,12 @@ typedef struct STableMeta { } STableMeta; #pragma pack(pop) +typedef struct SViewMeta { + int32_t version; + uint64_t viewId; + char* querySql; +} SViewMeta; + typedef struct SDBVgInfo { int32_t vgVersion; int16_t hashPrefix; @@ -148,6 +154,15 @@ typedef struct STableMetaOutput { STableMeta* tbMeta; } STableMetaOutput; +typedef struct SViewMetaOutput { + char name[TSDB_VIEW_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + char* querySql; + int8_t precision; + int32_t numOfCols; + SSchema* pSchema; +} SViewMetaOutput; + typedef struct SDataBuf { int32_t msgType; void* pData; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2c0b513889..ce9a72625c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -727,6 +727,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_IP_RANGE TAOS_DEF_ERROR_CODE(0, 0x266B) #define TSDB_CODE_PAR_INVALID_VIEW_QUERY TAOS_DEF_ERROR_CODE(0, 0x266C) #define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D) +#define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e8b62b618f..6219b264c4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8536,5 +8536,76 @@ void tFreeSCMDropViewReq(SCMDropViewReq* pReq) { taosMemoryFree(pReq->sql); } +int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->name) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1; + if (tEncodeU64(&encoder, pRsp->dbId) < 0) return -1; + if (tEncodeU64(&encoder, pRsp->viewId) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->querySql) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->version) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->name) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1; + if (tDecodeU64(&decoder, &pRsp->dbId) < 0) return -1; + if (tDecodeU64(&decoder, &pRsp->viewId) < 0) return -1; + if (tDecodeCStrAlloc(&decoder, &pRsp->querySql) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) { + if (NULL == pRsp) { + return; + } + + taosMemoryFree(pRsp->querySql); +} diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 4501a92754..e98ff061e9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -187,6 +187,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2f3fec0f60..aef2e57c4d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -729,7 +729,19 @@ void tFreeStreamObj(SStreamObj* pObj); // } SStreamCheckpointObj; -typedef SCMCreateViewReq SViewObj; +typedef struct { + char fullname[TSDB_VIEW_FNAME_LEN]; + char name[TSDB_VIEW_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + char* querySql; + uint64_t viewId; + uint64_t dbId; + int32_t version; + int8_t precision; + int32_t numOfCols; + SSchema* pSchema; + SRWLatch lock; +} SViewObj; int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj); int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver); diff --git a/source/dnode/mnode/impl/src/mndView.c b/source/dnode/mnode/impl/src/mndView.c index 14d6630d78..9955284917 100755 --- a/source/dnode/mnode/impl/src/mndView.c +++ b/source/dnode/mnode/impl/src/mndView.c @@ -19,6 +19,7 @@ int32_t mndInitView(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_VIEW, mndProcessCreateViewReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_VIEW, mndProcessDropViewReq); + mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessGetViewMetaReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveView); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView); @@ -76,6 +77,22 @@ int32_t mndProcessDropViewReq(SRpcMsg *pReq) { #endif } +static int32_t mndProcessViewMetaReq(SRpcMsg *pReq) { +#ifndef TD_ENTERPRISE + return TSDB_CODE_OPS_NOT_SUPPORT; +#else + SViewMetaReq req = {0}; + + if (tDeserializeSViewMetaReq(pReq->pCont, pReq->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + return mndProcessTableMetaReqImpl(&req, pReq); +#endif +} + + int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { #if 0 SMnode *pMnode = pReq->info.node; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 7d47e82164..72d26cdd5a 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -67,6 +67,7 @@ typedef enum { CTG_CI_USER, CTG_CI_UDF, CTG_CI_SVR_VER, + CTG_CI_VIEW, CTG_CI_MAX_VALUE, } CTG_CACHE_ITEM; @@ -82,6 +83,7 @@ enum { enum { CTG_RENT_DB = 1, CTG_RENT_STABLE, + CTG_RENT_VIEW, }; enum { @@ -96,6 +98,7 @@ enum { CTG_OP_UPDATE_VG_EPSET, CTG_OP_UPDATE_TB_INDEX, CTG_OP_DROP_TB_INDEX, + CTG_OP_UPDATE_VIEW_META, CTG_OP_CLEAR_CACHE, CTG_OP_MAX }; @@ -117,6 +120,7 @@ typedef enum { CTG_TASK_GET_TB_META_BATCH, CTG_TASK_GET_TB_HASH_BATCH, CTG_TASK_GET_TB_TAG, + CTG_TASK_GET_VIEW, } CTG_TASK_TYPE; typedef enum { @@ -240,6 +244,14 @@ typedef struct SCtgUserCtx { SUserAuthInfo user; } SCtgUserCtx; +typedef struct SCtgViewsCtx { + int32_t fetchNum; + SArray* pNames; + SArray* pResList; + SArray* pFetchs; +} SCtgViewsCtx; + + typedef STableIndexRsp STableIndex; typedef struct SCtgTbCache { @@ -259,12 +271,19 @@ typedef struct SCtgCfgCache { SDbCfgInfo* cfgInfo; } SCtgCfgCache; +typedef struct SCtgViewCache { + SRWLatch viewLock; + SViewMeta* pMeta; +} SCtgViewCache; + + typedef struct SCtgDBCache { SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads uint64_t dbId; int8_t deleted; SCtgVgCache vgCache; SCtgCfgCache cfgCache; + SHashObj* viewCache; // key:viewname, value:SCtgViewCache SHashObj* tbCache; // key:tbname, value:SCtgTbCache SHashObj* stbCache; // key:suid, value:char* uint64_t dbCacheNum[CTG_CI_MAX_VALUE]; @@ -300,6 +319,7 @@ typedef struct SCatalog { SHashObj* dbCache; // key:dbname, value:SCtgDBCache SCtgRentMgmt dbRent; SCtgRentMgmt stbRent; + SCtgRentMgmt viewRent; SCtgCacheStat cacheStat; } SCatalog; @@ -344,6 +364,7 @@ typedef struct SCtgJob { int32_t tbIndexNum; int32_t tbCfgNum; int32_t svrVerNum; + int32_t viewNum; } SCtgJob; typedef struct SCtgMsgCtx { @@ -509,6 +530,12 @@ typedef struct SCtgUpdateEpsetMsg { SEpSet epSet; } SCtgUpdateEpsetMsg; +typedef struct SCtgUpdateViewMetaMsg { + SCatalog* pCtg; + SViewMetaRsp* pRsp; +} SCtgUpdateViewMetaMsg; + + typedef struct SCtgCacheOperation { int32_t opId; void* data; @@ -686,10 +713,10 @@ typedef struct SCtgCacheItemInfo { (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || \ (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) +#define CTG_IS_BATCH_TASK(_taskType) ((CTG_TASK_GET_TB_META_BATCH == (_taskType)) || (CTG_TASK_GET_TB_HASH_BATCH == (_taskType)) || (CTG_TASK_GET_VIEW == (_taskType))) + #define CTG_GET_TASK_MSGCTX(_task, _id) \ - (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) \ - ? taosArrayGet((_task)->msgCtxs, (_id)) \ - : &(_task)->msgCtx) + (CTG_IS_BATCH_TASK((_task)->type) ? taosArrayGet((_task)->msgCtxs, (_id)) : &(_task)->msgCtx) #define CTG_META_SIZE(pMeta) \ (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f975517669..8ceb2b62d1 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -739,11 +739,16 @@ int32_t catalogInit(SCatalogCfg* cfg) { if (gCtgMgmt.cfg.stbRentSec == 0) { gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; } + + if (gCtgMgmt.cfg.viewRentSec == 0) { + gCtgMgmt.cfg.viewRentSec = CTG_DEFAULT_RENT_SECOND; + } } else { gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER; gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND; gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; + gCtgMgmt.cfg.viewRentSec = CTG_DEFAULT_RENT_SECOND; } gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), @@ -828,6 +833,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo))); CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion))); + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->viewRent, gCtgMgmt.cfg.viewRentSec, CTG_RENT_VIEW, sizeof(SViewVersion))); clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index fb5ecf7ad2..9df8a51128 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -53,7 +53,6 @@ int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) { } int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) { - SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_META_BATCH; @@ -184,7 +183,6 @@ int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) { } int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { - SName* name = (SName*)param; SCtgTask task = {0}; task.type = CTG_TASK_GET_TB_HASH_BATCH; @@ -417,6 +415,30 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_VIEW; + task.taskId = taskIdx; + task.pJob = pJob; + + task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgViewsCtx)); + if (NULL == task.taskCtx) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SCtgViewsCtx* ctx = task.taskCtx; + ctx->pNames = param; + ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes)); + + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, viewNum:%d", pJob->queryId, taskIdx, + ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->viewNum); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) { SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -547,9 +569,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag); + int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView); int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + - userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum; + userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum; *job = taosMemoryCalloc(1, sizeof(SCtgJob)); if (NULL == *job) { @@ -580,6 +603,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const pJob->tbCfgNum = tbCfgNum; pJob->svrVerNum = svrVerNum; pJob->tbTagNum = tbTagNum; + pJob->viewNum = viewNum; #if CTG_BATCH_FETCH pJob->pBatchs = @@ -668,6 +692,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_USER, user, NULL)); } + if (tbHashNum > 0) { + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, pReq->pView, NULL)); + } + if (qnodeNum) { CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL)); } @@ -1773,6 +1801,65 @@ _return: CTG_RET(code); } +int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgTask* pTask = tReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); + SCtgViewsCtx* ctx = (SCtgViewsCtx*)pTask->taskCtx; + SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); + SName* pName = ctgGetFetchName(ctx->pNames, pFetch); + int32_t flag = pFetch->flag; + int32_t* vgId = &pFetch->vgId; + bool taskDone = false; + + CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); + + SViewMetaRsp* pRsp = (SViewMetaRsp**)pMsgCtx->out; + SViewMeta* pViewMeta = taosMemoryCalloc(1, sizeof(SViewMeta)); + if (NULL == pViewMeta) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + pViewMeta->querySql = strdup(pRsp->querySql); + if (NULL == pViewMeta->querySql) { + taosMemoryFree(pViewMeta); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + ctgUpdateViewMetaToCache(pCtg, pRsp, false); + + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); + pRes->code = 0; + pRes->pRes = pViewMeta; + if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { + TSWAP(pTask->res, ctx->pResList); + taskDone = true; + } + +_return: + + if (code) { + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); + pRes->code = code; + pRes->pRes = NULL; + ctgTaskError("Get view %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); + if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { + TSWAP(pTask->res, ctx->pResList); + taskDone = true; + } + } + + if (pTask->res && taskDone) { + ctgHandleTaskEnd(pTask, code); + } + + CTG_RET(code); +} + + int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int32_t* vgId) { SCtgTask* pTask = tReq->pTask; SCatalog* pCtg = pTask->pJob->pCtg; @@ -2355,6 +2442,59 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgViewsCtx* pCtx = (SCtgViewsCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + bool tbMetaDone = false; + + ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone); + if (tbMetaDone) { + CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx)); + TSWAP(pTask->res, pCtx->pResList); + + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + return TSDB_CODE_SUCCESS; + } + + int32_t dbNum = taosArrayGetSize(pCtx->pNames); + int32_t fetchIdx = 0; + int32_t baseResIdx = 0; + for (int32_t i = 0; i < dbNum; ++i) { + STablesReq* pReq = taosArrayGet(pCtx->pNames, i); + ctgDebug("start to check views in db %s, viewNum %ld", pReq->dbFName, taosArrayGetSize(pReq->pTables)); + CTG_ERR_RET(ctgGetViewsFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables)); + baseResIdx += taosArrayGetSize(pReq->pTables); + } + + pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); + if (pCtx->fetchNum <= 0) { + TSWAP(pTask->res, pCtx->pResList); + + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + return TSDB_CODE_SUCCESS; + } + + pTask->msgCtxs = taosArrayInit_s(sizeof(SCtgMsgCtx), pCtx->fetchNum); + for (int32_t i = 0; i < pCtx->fetchNum; ++i) { + SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); + SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } + + SCtgTaskReq tReq; + tReq.pTask = pTask; + tReq.msgIdx = pFetch->fetchIdx; + CTG_ERR_RET(ctgGetViewInfoFromMnode(pCtg, pConn, pName, NULL, &tReq)); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) { ctgResetTbMetaTask(pTask); @@ -2470,6 +2610,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL}, {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, {ctgInitGetTbTagTask, ctgLaunchGetTbTagTask, ctgHandleGetTbTagRsp, ctgDumpTbTagRes, NULL, NULL}, + {ctgInitGetViewsTask, ctgLaunchGetViewsTask, ctgHandleGetViewsRsp, ctgDumpViewsRes, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob* pJob) { @@ -2541,6 +2682,29 @@ _return: CTG_RET(code); } +void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) { + SCtgTask* pTask = NULL; + + *done = true; + + CTG_LOCK(CTG_READ, &pJob->taskLock); + + int32_t taskNum = taosArrayGetSize(pJob->pTasks); + for (int32_t i = 0; i < taskNum; ++i) { + pTask = taosArrayGet(pJob->pTasks, i); + if (type != pTask->type) { + continue; + } + + if (pTask->status != CTG_TASK_DONE) { + *done = false; + break; + } + } + + CTG_UNLOCK(CTG_READ, &pJob->taskLock); +} + SCtgTask* ctgGetTask(SCtgJob* pJob, int32_t taskId) { int32_t taskNum = taosArrayGetSize(pJob->pTasks); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 44de83b7ef..4c0adf73eb 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -30,6 +30,7 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset}, {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex}, {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex}, + {CTG_OP_UPDATE_VIEW_META, "update viewMeta", ctgOpUpdateViewMeta}, {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = { @@ -171,6 +172,18 @@ void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache * } } +void ctgReleaseViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgViewCache *pCache) { + if (pCache && dbCache) { + CTG_UNLOCK(CTG_READ, &pCache->viewLock); + taosHashRelease(dbCache->viewCache, pCache); + } + + if (dbCache) { + ctgReleaseDBCache(pCtg, dbCache); + } +} + + void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { if (pCache) { CTG_UNLOCK(CTG_READ, &pCache->indexLock); @@ -1266,205 +1279,31 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) { - mgmt->slotRIdx = 0; - mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; - mgmt->type = type; - mgmt->metaSize = size; +int32_t ctgUpdateViewMetaEnqueue(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_UPDATE_VIEW_META; + op->syncOp = syncOp; - size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum; - - mgmt->slots = taosMemoryCalloc(1, msgSize); - if (NULL == mgmt->slots) { - qError("calloc %d failed", (int32_t)msgSize); + SCtgUpdateViewMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateViewMetaMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateViewMetaMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - mgmt->rentCacheSize = msgSize; - - qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); - - return TSDB_CODE_SUCCESS; -} - -int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { - int16_t widx = abs((int)(id % mgmt->slotNum)); - - SCtgRentSlot *slot = &mgmt->slots[widx]; - int32_t code = 0; - - CTG_LOCK(CTG_WRITE, &slot->lock); - if (NULL == slot->meta) { - slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size); - if (NULL == slot->meta) { - qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, - mgmt->type); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } + char *p = strchr(pRsp->dbFName, '.'); + if (p && IS_SYS_DBNAME(p + 1)) { + int32_t len = strlen(p + 1); + memmove(pRsp->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len); } - if (NULL == taosArrayPush(slot->meta, meta)) { - qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } + msg->pCtg = pCtg; + msg->pRsp = pRsp; - mgmt->rentCacheSize += size; - slot->needSort = true; + op->data = msg; - qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - -_return: - - CTG_UNLOCK(CTG_WRITE, &slot->lock); - CTG_RET(code); -} - -int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, - __compar_fn_t searchCompare) { - int16_t widx = abs((int)(id % mgmt->slotNum)); - - SCtgRentSlot *slot = &mgmt->slots[widx]; - int32_t code = 0; - - CTG_LOCK(CTG_WRITE, &slot->lock); - if (NULL == slot->meta) { - qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - if (slot->needSort) { - qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, - (int32_t)taosArrayGetSize(slot->meta)); - taosArraySort(slot->meta, sortCompare); - slot->needSort = false; - qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); - } - - void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); - if (NULL == orig) { - qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, - (int32_t)taosArrayGetSize(slot->meta)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - memcpy(orig, meta, size); - - qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - -_return: - - CTG_UNLOCK(CTG_WRITE, &slot->lock); - - if (code) { - qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id, - widx, mgmt->type); - CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size)); - } - - CTG_RET(code); -} - -int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { - int16_t widx = abs((int)(id % mgmt->slotNum)); - - SCtgRentSlot *slot = &mgmt->slots[widx]; - int32_t code = 0; - - CTG_LOCK(CTG_WRITE, &slot->lock); - if (NULL == slot->meta) { - qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - if (slot->needSort) { - taosArraySort(slot->meta, sortCompare); - slot->needSort = false; - qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); - } - - int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); - if (idx < 0) { - qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - taosArrayRemove(slot->meta, idx); - mgmt->rentCacheSize -= mgmt->metaSize; - - qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); - -_return: - - CTG_UNLOCK(CTG_WRITE, &slot->lock); - - CTG_RET(code); -} - -int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { - int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); - if (ridx >= mgmt->slotNum) { - ridx %= mgmt->slotNum; - atomic_store_16(&mgmt->slotRIdx, ridx); - } - - SCtgRentSlot *slot = &mgmt->slots[ridx]; - int32_t code = 0; - - CTG_LOCK(CTG_READ, &slot->lock); - if (NULL == slot->meta) { - qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type); - *num = 0; - goto _return; - } - - size_t metaNum = taosArrayGetSize(slot->meta); - if (metaNum <= 0) { - qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type); - *num = 0; - goto _return; - } - - size_t msize = metaNum * size; - *res = taosMemoryMalloc(msize); - if (NULL == *res) { - qError("malloc %d failed", (int32_t)msize); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - void *meta = taosArrayGet(slot->meta, 0); - - memcpy(*res, meta, msize); - - *num = (uint32_t)metaNum; - - qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type); - -_return: - - CTG_UNLOCK(CTG_READ, &slot->lock); - - CTG_RET(code); -} - -int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { - while (true) { - int64_t msec = taosGetTimestampMs(); - int64_t lsec = atomic_load_64(&mgmt->lastReadMsec); - if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) { - *res = NULL; - *num = 0; - qDebug("too short time period to get expired meta, type:%d", mgmt->type); - return TSDB_CODE_SUCCESS; - } - - if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) { - continue; - } - - break; - } - - CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size)); + CTG_ERR_RET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; } @@ -1522,25 +1361,6 @@ _return: CTG_RET(code); } -void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) { - if (NULL == dbCache->stbCache) { - return; - } - - void *pIter = taosHashIterate(dbCache->stbCache, NULL); - while (pIter) { - uint64_t *suid = NULL; - suid = taosHashGetKey(pIter, NULL); - - if (TSDB_CODE_SUCCESS == - ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { - ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid); - } - - pIter = taosHashIterate(dbCache->stbCache, pIter); - } -} - int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) { uint64_t dbId = dbCache->dbId; @@ -1609,30 +1429,6 @@ int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCt return TSDB_CODE_SUCCESS; } -int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid, - SCtgTbCache *pCache) { - SSTableVersion metaRent = {.dbId = dbId, .suid = suid}; - if (pCache->pMeta) { - metaRent.sversion = pCache->pMeta->sversion; - metaRent.tversion = pCache->pMeta->tversion; - } - - if (pCache->pIndex) { - metaRent.smaVer = pCache->pIndex->version; - } - - tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName)); - tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName)); - - CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), - ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); - - ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId, - tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer); - - return TSDB_CODE_SUCCESS; -} - int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) { if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) { @@ -1785,6 +1581,63 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa return TSDB_CODE_SUCCESS; } +int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *viewName, SViewMeta *pMeta) { + if (NULL == dbCache->viewCache) { + ctgWarn("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); + CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED); + } + + SCtgViewCache *pCache = taosHashGet(dbCache->viewCache, viewName, strlen(viewName)); + if (NULL == pCache) { + SCtgViewCache cache = {0}; + cache.pMeta = pMeta; + + if (taosHashPut(dbCache->viewCache, viewName, strlen(viewName), &cache, sizeof(cache)) != 0) { + ctgError("taosHashPut new tbCache failed, viewName:%s", viewName); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(viewName) + sizeof(SCtgViewCache) + ctgGetViewMetaCacheSize(pMeta)); + + CTG_DB_NUM_INC(CTG_CI_VIEW); + + pMeta = NULL; + ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version); + + CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, &cache)); + + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_WRITE, &pCache->viewLock); + + if (pCache->pMeta) { + atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pMeta)); + taosMemoryFree(pCache->pMeta->querySql); + taosMemoryFree(pCache->pMeta); + } + + pCache->pMeta = pMeta; + CTG_UNLOCK(CTG_WRITE, &pCache->viewLock); + + atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pMeta)); + + pMeta = NULL; + + ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version); + + CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, pCache)); + +_return: + + if (pMeta) { + taosMemoryFree(pMeta->querySql); + taosMemoryFree(pMeta); + } + + return TSDB_CODE_SUCCESS; +} + int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) { STableMetaOutput *pOutput = NULL; int32_t code = 0; @@ -1802,6 +1655,11 @@ _return: CTG_RET(code); } +int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq) { + CTG_RET(ctgUpdateViewMetaEnqueue(pCtg, pRsp, syncReq)); +} + + void ctgClearAllHandles(void) { SCatalog *pCtg = NULL; @@ -2416,6 +2274,46 @@ _return: CTG_RET(code); } + +int32_t ctgOpUpdateViewMeta(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgUpdateViewMetaMsg *msg = operation->data; + SCatalog *pCtg = msg->pCtg; + SViewMetaRsp *pRsp = msg->pRsp; + SCtgDBCache *dbCache = NULL; + + if (pCtg->stopUpdate) { + goto _return; + } + + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache)); + if (NULL == dbCache) { + ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + SViewMeta *pMeta = taosMemoryCalloc(1, sizeof(SViewMeta)); + if (NULL == pMeta) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + pMeta->querySql = strdup(pRsp->querySql); + if (NULL == pMeta->querySql) { + taosMemoryFree(pMeta); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + pMeta->viewId = pRsp->viewId; + pMeta->version = pRsp->version; + + CTG_ERR_JRET(ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta)); + +_return: + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + + void ctgClearFreeCache(SCtgCacheOperation *operation) { SCtgClearCacheMsg *msg = operation->data; SCatalog *pCtg = msg->pCtg; @@ -2776,6 +2674,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe CTG_LOCK(CTG_READ, &pCache->metaLock); if (NULL == pCache->pMeta) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); + taosHashRelease(dbCache->tbCache, pCache); ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArrayPush(ctx->pResList, &(SMetaRes){0}); @@ -2995,3 +2894,94 @@ _return: CTG_RET(code); } + + +int32_t ctgGetViewsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgViewsCtx *ctx, int32_t dbIdx, + int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) { + int32_t tbNum = taosArrayGetSize(pList); + SName *pName = taosArrayGet(pList, 0); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + int32_t flag = CTG_FLAG_UNKNOWN_STB; + uint64_t lastSuid = 0; + STableMeta *lastTableMeta = NULL; + + if (IS_SYS_DBNAME(pName->dbname)) { + CTG_FLAG_SET_SYS_DB(flag); + strcpy(dbFName, pName->dbname); + } else { + tNameGetFullDbName(pName, dbFName); + } + + SCtgDBCache *dbCache = NULL; + SCtgViewCache *pCache = NULL; + ctgAcquireDBCache(pCtg, dbFName, &dbCache); + + if (NULL == dbCache) { + ctgDebug("db %s not in cache", dbFName); + for (int32_t i = 0; i < tbNum; ++i) { + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArrayPush(ctx->pResList, &(SMetaData){0}); + } + + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < tbNum; ++i) { + pName = taosArrayGet(pList, i); + + pCache = taosHashAcquire(dbCache->viewCache, pName->tname, strlen(pName->tname)); + if (NULL == pCache) { + ctgDebug("view %s not in cache, dbFName:%s", pName->tname, dbFName); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArrayPush(ctx->pResList, &(SMetaRes){0}); + CTG_CACHE_NHIT_INC(CTG_CI_VIEW, 1); + + continue; + } + + CTG_LOCK(CTG_READ, &pCache->viewLock); + if (NULL == pCache->pMeta) { + CTG_UNLOCK(CTG_READ, &pCache->viewLock); + taosHashRelease(dbCache->viewCache, pCache); + ctgDebug("view %s meta not in cache, dbFName:%s", pName->tname, dbFName); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); + taosArrayPush(ctx->pResList, &(SMetaRes){0}); + CTG_CACHE_NHIT_INC(CTG_CI_VIEW, 1); + + continue; + } + + CTG_CACHE_HIT_INC(CTG_CI_VIEW, 1); + + SMetaRes res = {0}; + SViewMeta *pViewMeta = taosMemoryCalloc(1, sizeof(SViewMeta)); + if (NULL == pViewMeta) { + ctgReleaseViewMetaToCache(pCtg, dbCache, pCache); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + pViewMeta->querySql = strdup(pCache->pMeta->querySql); + if (NULL == pViewMeta->querySql) { + ctgReleaseViewMetaToCache(pCtg, dbCache, pCache); + taosMemoryFree(pViewMeta); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + pViewMeta->version = pCache->pMeta->version; + pViewMeta->viewId = pCache->pMeta->viewId; + + CTG_UNLOCK(CTG_READ, &pCache->viewLock); + taosHashRelease(dbCache->viewCache, pCache); + + ctgDebug("Got view %s meta from cache, dbFName:%s", pName->tname, dbFName); + + res.pRes = pViewMeta; + taosArrayPush(ctx->pResList, &res); + } + + ctgReleaseDBCache(pCtg, dbCache); + + return TSDB_CODE_SUCCESS; +} + + + diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 7cba6ddf0d..af78efff9e 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -391,6 +391,8 @@ int32_t ctgdGetClusterCacheNum(SCatalog *pCtg, int32_t type) { return ctgdGetRentNum(&pCtg->dbRent); case CTG_DBG_STB_RENT_NUM: return ctgdGetRentNum(&pCtg->stbRent); + case CTG_DBG_VIEW_RENT_NUM: + return ctgdGetRentNum(&pCtg->viewRent); default: break; } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 4a05510217..3a12d822f4 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -318,6 +318,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("Got svr ver from mnode"); break; } + case TDMT_MND_VIEW_META: { + if (TSDB_CODE_SUCCESS != rspCode) { + qError("error rsp for get view-meta, error:%s, viewFName:%s", tstrerror(rspCode), target); + CTG_ERR_RET(rspCode); + } + + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); + if (code) { + qError("Process get view-meta rsp failed, error:%s, viewFName:%s", tstrerror(code), target); + CTG_ERR_RET(code); + } + + qDebug("Got view-meta from mnode, viewFName:%s", target); + break; + } default: if (TSDB_CODE_SUCCESS != rspCode) { qError("Got error rsp, error:%s", tstrerror(rspCode)); @@ -1388,3 +1403,60 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou return TSDB_CODE_SUCCESS; } + +int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pName, SViewMetaOutput* out, + SCtgTaskReq* tReq) { + char* msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_VIEW_META; + SCtgTask* pTask = tReq ? tReq->pTask : NULL; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + + ctgDebug("try to get view info from mnode, viewFName:%s", fullName); + + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp); + if (code) { + ctgError("Build view-meta msg failed, code:%x, viewFName:%s", code, fullName); + CTG_ERR_RET(code); + } + + if (pTask) { + void* pOut = taosMemoryCalloc(1, POINTER_BYTES); + if (NULL == pOut) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, fullName)); + +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen)); +#else + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen)); +#endif + } + + SRpcMsg rpcMsg = { + .msgType = reqType, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); + + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, fullName)); + + rpcFreeCont(rpcRsp.pCont); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/catalog/src/ctgRent.c b/source/libs/catalog/src/ctgRent.c new file mode 100755 index 0000000000..ba7e02e333 --- /dev/null +++ b/source/libs/catalog/src/ctgRent.c @@ -0,0 +1,287 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "catalogInt.h" +#include "query.h" +#include "systable.h" +#include "tname.h" +#include "trpc.h" + +int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) { + mgmt->slotRIdx = 0; + mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; + mgmt->type = type; + mgmt->metaSize = size; + + size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum; + + mgmt->slots = taosMemoryCalloc(1, msgSize); + if (NULL == mgmt->slots) { + qError("calloc %d failed", (int32_t)msgSize); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + mgmt->rentCacheSize = msgSize; + + qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { + int16_t widx = abs((int)(id % mgmt->slotNum)); + + SCtgRentSlot *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size); + if (NULL == slot->meta) { + qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, + mgmt->type); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + if (NULL == taosArrayPush(slot->meta, meta)) { + qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + mgmt->rentCacheSize += size; + slot->needSort = true; + + qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + CTG_RET(code); +} + +int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, + __compar_fn_t searchCompare) { + int16_t widx = abs((int)(id % mgmt->slotNum)); + + SCtgRentSlot *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (slot->needSort) { + qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, + (int32_t)taosArrayGetSize(slot->meta)); + taosArraySort(slot->meta, sortCompare); + slot->needSort = false; + qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); + } + + void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); + if (NULL == orig) { + qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, + (int32_t)taosArrayGetSize(slot->meta)); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + memcpy(orig, meta, size); + + qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + + if (code) { + qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id, + widx, mgmt->type); + CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size)); + } + + CTG_RET(code); +} + +int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { + int16_t widx = abs((int)(id % mgmt->slotNum)); + + SCtgRentSlot *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (slot->needSort) { + taosArraySort(slot->meta, sortCompare); + slot->needSort = false; + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); + } + + int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); + if (idx < 0) { + qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + taosArrayRemove(slot->meta, idx); + mgmt->rentCacheSize -= mgmt->metaSize; + + qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + + CTG_RET(code); +} + +int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { + int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); + if (ridx >= mgmt->slotNum) { + ridx %= mgmt->slotNum; + atomic_store_16(&mgmt->slotRIdx, ridx); + } + + SCtgRentSlot *slot = &mgmt->slots[ridx]; + int32_t code = 0; + + CTG_LOCK(CTG_READ, &slot->lock); + if (NULL == slot->meta) { + qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type); + *num = 0; + goto _return; + } + + size_t metaNum = taosArrayGetSize(slot->meta); + if (metaNum <= 0) { + qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type); + *num = 0; + goto _return; + } + + size_t msize = metaNum * size; + *res = taosMemoryMalloc(msize); + if (NULL == *res) { + qError("malloc %d failed", (int32_t)msize); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + void *meta = taosArrayGet(slot->meta, 0); + + memcpy(*res, meta, msize); + + *num = (uint32_t)metaNum; + + qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_READ, &slot->lock); + + CTG_RET(code); +} + +int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { + while (true) { + int64_t msec = taosGetTimestampMs(); + int64_t lsec = atomic_load_64(&mgmt->lastReadMsec); + if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) { + *res = NULL; + *num = 0; + qDebug("too short time period to get expired meta, type:%d", mgmt->type); + return TSDB_CODE_SUCCESS; + } + + if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) { + continue; + } + + break; + } + + CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size)); + + return TSDB_CODE_SUCCESS; +} + +void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) { + if (NULL == dbCache->stbCache) { + return; + } + + void *pIter = taosHashIterate(dbCache->stbCache, NULL); + while (pIter) { + uint64_t *suid = NULL; + suid = taosHashGetKey(pIter, NULL); + + if (TSDB_CODE_SUCCESS == + ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { + ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid); + } + + pIter = taosHashIterate(dbCache->stbCache, pIter); + } +} + +int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid, + SCtgTbCache *pCache) { + SSTableVersion metaRent = {.dbId = dbId, .suid = suid}; + if (pCache->pMeta) { + metaRent.sversion = pCache->pMeta->sversion; + metaRent.tversion = pCache->pMeta->tversion; + } + + if (pCache->pIndex) { + metaRent.smaVer = pCache->pIndex->version; + } + + tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName)); + tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName)); + + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), + ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); + + ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId, + tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer); + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, uint64_t dbId, uint64_t viewId, + SCtgViewCache *pCache) { + SViewVersion metaRent = {.dbId = dbId, .viewId = viewId}; + metaRent.version = pCache->pMeta->version; + + tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName)); + tstrncpy(metaRent.viewName, viewName, sizeof(metaRent.viewName)); + + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->viewRent, &metaRent, metaRent.viewId, sizeof(SViewVersion), + ctgViewVersionSortCompare, ctgViewVersionSearchCompare)); + + ctgDebug("db %s,0x%" PRIx64 " view %s,0x%" PRIx64 " version %d updated to viewRent", dbFName, dbId, viewName, viewId, metaRent.version); + + return TSDB_CODE_SUCCESS; +} + + + + diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index dab007aa47..fd87caaa5d 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -93,6 +93,10 @@ char* ctgTaskTypeStr(CTG_TASK_TYPE type) { return "[bget table meta]"; case CTG_TASK_GET_TB_HASH_BATCH: return "[bget table hash]"; + case CTG_TASK_GET_TB_TAG: + return "[get table tag]"; + case CTG_TASK_GET_VIEW: + return "[get view]"; default: return "unknown"; } @@ -304,6 +308,7 @@ void ctgFreeInstUserCache(SHashObj* pUserCache) { void ctgFreeHandleImpl(SCatalog* pCtg) { ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); + ctgFreeMetaRent(&pCtg->viewRent); ctgFreeInstDbCache(pCtg->dbCache); ctgFreeInstUserCache(pCtg->userCache); @@ -320,6 +325,7 @@ void ctgFreeHandle(SCatalog* pCtg) { ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); + ctgFreeMetaRent(&pCtg->viewRent); ctgFreeInstDbCache(pCtg->dbCache); ctgFreeInstUserCache(pCtg->userCache); @@ -405,12 +411,14 @@ void ctgClearHandle(SCatalog* pCtg) { ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->stbRent); + ctgFreeMetaRent(&pCtg->viewRent); ctgFreeInstDbCache(pCtg->dbCache); ctgFreeInstUserCache(pCtg->userCache); ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo)); ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)); + ctgMetaRentInit(&pCtg->viewRent, gCtgMgmt.cfg.viewRentSec, CTG_RENT_VIEW, sizeof(SViewVersion)); pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -513,6 +521,12 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { taosMemoryFreeClear(pCtx->out); break; } + case TDMT_MND_VIEW_META: { + SViewMetaRsp* pOut = (SViewMetaRsp*)pCtx->out; + taosMemoryFree(pOut->querySql); + taosMemoryFreeClear(pCtx->out); + break; + } default: qError("invalid reqType %d", pCtx->reqType); break; @@ -1202,6 +1216,17 @@ int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2) { } } +int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2) { + if (*(uint64_t*)key1 < ((SViewVersion*)key2)->viewId) { + return -1; + } else if (*(uint64_t*)key1 > ((SViewVersion*)key2)->viewId) { + return 1; + } else { + return 0; + } +} + + int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) { if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) { return -1; @@ -1222,6 +1247,17 @@ int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2) { } } +int32_t ctgViewVersionSortCompare(const void* key1, const void* key2) { + if (((SViewVersion*)key1)->viewId < ((SViewVersion*)key2)->viewId) { + return -1; + } else if (((SViewVersion*)key1)->viewId > ((SViewVersion*)key2)->viewId) { + return 1; + } else { + return 0; + } +} + + int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) { if (NULL == dbInfo) { return TSDB_CODE_SUCCESS; @@ -1758,6 +1794,15 @@ uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) { return sizeof(*pIndex) + pIndex->indexSize; } +uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta) { + if (NULL == pMeta) { + return 0; + } + + return sizeof(*pMeta) + strlen(pMeta->querySql) + 1; +} + + FORCE_INLINE uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta) { if (NULL == pMeta) { return 0; @@ -1870,6 +1915,7 @@ uint64_t ctgGetClusterCacheSize(SCatalog *pCtg) { cacheSize += pCtg->dbRent.rentCacheSize; cacheSize += pCtg->stbRent.rentCacheSize; + cacheSize += pCtg->viewRent.rentCacheSize; return cacheSize; } @@ -1958,3 +2004,20 @@ void ctgGetGlobalCacheSize(uint64_t *pSize) { } } +int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx) { + int32_t dbNum = taosArrayGetSize(pCtx->pNames); + for (int32_t i = 0; i < dbNum; ++i) { + STablesReq* pReq = taosArrayGet(pCtx->pNames, i); + int32_t viewNum = taosArrayGetSize(pReq->pTables); + + ctgDebug("start to check views in db %s, viewNum %d", pReq->dbFName, viewNum); + + for (int32_t m = 0; m < viewNum; ++m) { + taosArrayPush(pCtx->pResList, &(SMetaData){0}); + } + } + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index bf6ff87c55..9b38966ed8 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -658,26 +658,17 @@ static int32_t collectMetaKeyFromGrant(SCollectMetaKeyCxt* pCxt, SGrantStmt* pSt static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreateViewStmt* pStmt) { int32_t code = - reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache); - if (TSDB_CODE_SUCCESS == code) { - code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery); - } + reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache); if (TSDB_CODE_SUCCESS == code) { code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE, pCxt->pMetaCache); } - return code; } static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) { - int32_t code = - reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache); - if (TSDB_CODE_SUCCESS == code) { - code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE, + int32_t code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE, pCxt->pMetaCache); - } - return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ae57fee448..6656f2fd95 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7267,6 +7267,13 @@ static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStm return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type"); } + STableMeta* pMetaCache = NULL; + int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->viewName, &pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + taosMemoryFreeClear(pMetaCache); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table"); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index dec4bd2e51..2608a2ea33 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -691,9 +691,11 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog if (TSDB_CODE_SUCCESS == code) { code = buildTableReq(pMetaCache->pTableCfg, &pCatalogReq->pTableCfg); } +#ifdef TD_ENTERPRISE if (TSDB_CODE_SUCCESS == code) { - code = buildTableReqFromDb(pMetaCache->pViews, &pCatalogReq->pView); + code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pView); } +#endif pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired; return code; } @@ -886,14 +888,6 @@ int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCac return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta); } -int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { - return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pViews); -} - -int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { - return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pViews); -} - int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 01b136d5e0..bdb040d0c8 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -286,6 +286,24 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetViewMetaMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SViewMetaReq req = {0}; + strncpy(req.fullname, input, sizeof(req.fullname) - 1); + + int32_t bufLen = tSerializeSViewMetaReq(NULL, 0, &req); + void *pBuf = (*mallcFp)(bufLen); + tSerializeSViewMetaReq(pBuf, bufLen, &req); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -637,6 +655,25 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } +int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SViewMetaRsp *out = taosMemoryCalloc(1, sizeof(SViewMetaRsp)); + if (tDeserializeSViewMetaRsp(msg, msgSize, out) != 0) { + qError("tDeserializeSViewMetaRsp failed, msgSize:%d", msgSize); + tFreeSViewMetaRsp(out); + taosMemoryFree(out); + return TSDB_CODE_INVALID_MSG; + } + + *(SViewMetaRsp **)output = out; + + return TSDB_CODE_SUCCESS; +} + + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; @@ -651,6 +688,7 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -665,6 +703,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp; } #pragma GCC diagnostic pop diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 5e985ae25b..9428c676be 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -588,6 +588,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal err TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table") //planner TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")