feat: add view meta processing

This commit is contained in:
dapan1121 2023-09-28 11:44:38 +08:00
parent a61d90f672
commit 0a0f512023
22 changed files with 1054 additions and 264 deletions

View File

@ -3912,7 +3912,6 @@ typedef struct {
int8_t precision;
int32_t numOfCols;
SSchema* pSchema;
SRWLatch lock;
} SCMCreateViewReq;
int32_t tSerializeSCMCreateViewReq(void* buf, int32_t bufLen, const SCMCreateViewReq* pReq);
@ -3931,6 +3930,25 @@ int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq
int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pReq);
void tFreeSCMDropViewReq(SCMDropViewReq* pReq);
typedef struct {
char fullname[TSDB_VIEW_FNAME_LEN];
} SViewMetaReq;
int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq);
int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq);
typedef struct {
char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
uint64_t viewId;
char* querySql;
int32_t version;
} SViewMetaRsp;
int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp);
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp);
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp);
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -179,6 +179,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -39,6 +39,7 @@ enum {
CTG_DBG_STB_NUM,
CTG_DBG_DB_RENT_NUM,
CTG_DBG_STB_RENT_NUM,
CTG_DBG_VIEW_RENT_NUM,
};
typedef enum {
@ -111,7 +112,7 @@ typedef struct SMetaData {
SArray* pTableCfg; // pRes = STableCfg*
SArray* pTableTag; // pRes = SArray<STagVal>*
SArray* pDnodeList; // pRes = SArray<SEpSet>*
SArray* pView; // pRes = SViewInfo*
SArray* pView; // pRes = SViewMeta*
SMetaRes* pSvrVer; // pRes = char*
} SMetaData;
@ -121,12 +122,13 @@ typedef struct SCatalogCfg {
uint32_t maxUserCacheNum;
uint32_t dbRentSec;
uint32_t stbRentSec;
uint32_t viewRentSec;
} SCatalogCfg;
typedef struct SSTableVersion {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
int64_t dbId;
uint64_t suid;
int32_t sversion;
int32_t tversion;
@ -142,6 +144,15 @@ typedef struct SDbCacheInfo {
int64_t stateTs;
} SDbCacheInfo;
typedef struct SViewVersion {
char dbFName[TSDB_DB_FNAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN];
int64_t dbId;
uint64_t viewId;
int32_t version;
} SViewVersion;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;

View File

@ -119,6 +119,12 @@ typedef struct STableMeta {
} STableMeta;
#pragma pack(pop)
typedef struct SViewMeta {
int32_t version;
uint64_t viewId;
char* querySql;
} SViewMeta;
typedef struct SDBVgInfo {
int32_t vgVersion;
int16_t hashPrefix;
@ -148,6 +154,15 @@ typedef struct STableMetaOutput {
STableMeta* tbMeta;
} STableMetaOutput;
typedef struct SViewMetaOutput {
char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char* querySql;
int8_t precision;
int32_t numOfCols;
SSchema* pSchema;
} SViewMetaOutput;
typedef struct SDataBuf {
int32_t msgType;
void* pData;

View File

@ -727,6 +727,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_IP_RANGE TAOS_DEF_ERROR_CODE(0, 0x266B)
#define TSDB_CODE_PAR_INVALID_VIEW_QUERY TAOS_DEF_ERROR_CODE(0, 0x266C)
#define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D)
#define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -8536,5 +8536,76 @@ void tFreeSCMDropViewReq(SCMDropViewReq* pReq) {
taosMemoryFree(pReq->sql);
}
int32_t tSerializeSViewMetaReq(void* buf, int32_t bufLen, const SViewMetaReq* pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->name) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->dbId) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->viewId) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->querySql) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->dbId) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->viewId) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pRsp->querySql) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) {
if (NULL == pRsp) {
return;
}
taosMemoryFree(pRsp->querySql);
}

View File

@ -187,6 +187,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;

View File

@ -729,7 +729,19 @@ void tFreeStreamObj(SStreamObj* pObj);
// } SStreamCheckpointObj;
typedef SCMCreateViewReq SViewObj;
typedef struct {
char fullname[TSDB_VIEW_FNAME_LEN];
char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char* querySql;
uint64_t viewId;
uint64_t dbId;
int32_t version;
int8_t precision;
int32_t numOfCols;
SSchema* pSchema;
SRWLatch lock;
} SViewObj;
int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj);
int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver);

View File

@ -19,6 +19,7 @@
int32_t mndInitView(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_VIEW, mndProcessCreateViewReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_VIEW, mndProcessDropViewReq);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessGetViewMetaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveView);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView);
@ -76,6 +77,22 @@ int32_t mndProcessDropViewReq(SRpcMsg *pReq) {
#endif
}
static int32_t mndProcessViewMetaReq(SRpcMsg *pReq) {
#ifndef TD_ENTERPRISE
return TSDB_CODE_OPS_NOT_SUPPORT;
#else
SViewMetaReq req = {0};
if (tDeserializeSViewMetaReq(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
return mndProcessTableMetaReqImpl(&req, pReq);
#endif
}
int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
#if 0
SMnode *pMnode = pReq->info.node;

View File

@ -67,6 +67,7 @@ typedef enum {
CTG_CI_USER,
CTG_CI_UDF,
CTG_CI_SVR_VER,
CTG_CI_VIEW,
CTG_CI_MAX_VALUE,
} CTG_CACHE_ITEM;
@ -82,6 +83,7 @@ enum {
enum {
CTG_RENT_DB = 1,
CTG_RENT_STABLE,
CTG_RENT_VIEW,
};
enum {
@ -96,6 +98,7 @@ enum {
CTG_OP_UPDATE_VG_EPSET,
CTG_OP_UPDATE_TB_INDEX,
CTG_OP_DROP_TB_INDEX,
CTG_OP_UPDATE_VIEW_META,
CTG_OP_CLEAR_CACHE,
CTG_OP_MAX
};
@ -117,6 +120,7 @@ typedef enum {
CTG_TASK_GET_TB_META_BATCH,
CTG_TASK_GET_TB_HASH_BATCH,
CTG_TASK_GET_TB_TAG,
CTG_TASK_GET_VIEW,
} CTG_TASK_TYPE;
typedef enum {
@ -240,6 +244,14 @@ typedef struct SCtgUserCtx {
SUserAuthInfo user;
} SCtgUserCtx;
typedef struct SCtgViewsCtx {
int32_t fetchNum;
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
} SCtgViewsCtx;
typedef STableIndexRsp STableIndex;
typedef struct SCtgTbCache {
@ -259,12 +271,19 @@ typedef struct SCtgCfgCache {
SDbCfgInfo* cfgInfo;
} SCtgCfgCache;
typedef struct SCtgViewCache {
SRWLatch viewLock;
SViewMeta* pMeta;
} SCtgViewCache;
typedef struct SCtgDBCache {
SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
uint64_t dbId;
int8_t deleted;
SCtgVgCache vgCache;
SCtgCfgCache cfgCache;
SHashObj* viewCache; // key:viewname, value:SCtgViewCache
SHashObj* tbCache; // key:tbname, value:SCtgTbCache
SHashObj* stbCache; // key:suid, value:char*
uint64_t dbCacheNum[CTG_CI_MAX_VALUE];
@ -300,6 +319,7 @@ typedef struct SCatalog {
SHashObj* dbCache; // key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent;
SCtgRentMgmt viewRent;
SCtgCacheStat cacheStat;
} SCatalog;
@ -344,6 +364,7 @@ typedef struct SCtgJob {
int32_t tbIndexNum;
int32_t tbCfgNum;
int32_t svrVerNum;
int32_t viewNum;
} SCtgJob;
typedef struct SCtgMsgCtx {
@ -509,6 +530,12 @@ typedef struct SCtgUpdateEpsetMsg {
SEpSet epSet;
} SCtgUpdateEpsetMsg;
typedef struct SCtgUpdateViewMetaMsg {
SCatalog* pCtg;
SViewMetaRsp* pRsp;
} SCtgUpdateViewMetaMsg;
typedef struct SCtgCacheOperation {
int32_t opId;
void* data;
@ -686,10 +713,10 @@ typedef struct SCtgCacheItemInfo {
(CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || \
(CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_BATCH_TASK(_taskType) ((CTG_TASK_GET_TB_META_BATCH == (_taskType)) || (CTG_TASK_GET_TB_HASH_BATCH == (_taskType)) || (CTG_TASK_GET_VIEW == (_taskType)))
#define CTG_GET_TASK_MSGCTX(_task, _id) \
(((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) \
? taosArrayGet((_task)->msgCtxs, (_id)) \
: &(_task)->msgCtx)
(CTG_IS_BATCH_TASK((_task)->type) ? taosArrayGet((_task)->msgCtxs, (_id)) : &(_task)->msgCtx)
#define CTG_META_SIZE(pMeta) \
(sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))

View File

@ -739,11 +739,16 @@ int32_t catalogInit(SCatalogCfg* cfg) {
if (gCtgMgmt.cfg.stbRentSec == 0) {
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
}
if (gCtgMgmt.cfg.viewRentSec == 0) {
gCtgMgmt.cfg.viewRentSec = CTG_DEFAULT_RENT_SECOND;
}
} else {
gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
gCtgMgmt.cfg.viewRentSec = CTG_DEFAULT_RENT_SECOND;
}
gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT),
@ -828,6 +833,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo)));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion)));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->viewRent, gCtgMgmt.cfg.viewRentSec, CTG_RENT_VIEW, sizeof(SViewVersion)));
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
false, HASH_ENTRY_LOCK);

View File

@ -53,7 +53,6 @@ int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
}
int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName* name = (SName*)param;
SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_META_BATCH;
@ -184,7 +183,6 @@ int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
}
int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName* name = (SName*)param;
SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_HASH_BATCH;
@ -417,6 +415,30 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
task.type = CTG_TASK_GET_VIEW;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgViewsCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgViewsCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes));
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, viewNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->viewNum);
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@ -547,9 +569,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum;
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum;
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) {
@ -580,6 +603,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
pJob->tbCfgNum = tbCfgNum;
pJob->svrVerNum = svrVerNum;
pJob->tbTagNum = tbTagNum;
pJob->viewNum = viewNum;
#if CTG_BATCH_FETCH
pJob->pBatchs =
@ -668,6 +692,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_USER, user, NULL));
}
if (tbHashNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, pReq->pView, NULL));
}
if (qnodeNum) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
}
@ -1773,6 +1801,65 @@ _return:
CTG_RET(code);
}
int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgViewsCtx* ctx = (SCtgViewsCtx*)pTask->taskCtx;
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId;
bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
SViewMetaRsp* pRsp = (SViewMetaRsp**)pMsgCtx->out;
SViewMeta* pViewMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pViewMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pViewMeta->querySql = strdup(pRsp->querySql);
if (NULL == pViewMeta->querySql) {
taosMemoryFree(pViewMeta);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
ctgUpdateViewMetaToCache(pCtg, pRsp, false);
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = 0;
pRes->pRes = pViewMeta;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
_return:
if (code) {
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = code;
pRes->pRes = NULL;
ctgTaskError("Get view %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
}
if (pTask->res && taskDone) {
ctgHandleTaskEnd(pTask, code);
}
CTG_RET(code);
}
int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int32_t* vgId) {
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg;
@ -2355,6 +2442,59 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgViewsCtx* pCtx = (SCtgViewsCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
bool tbMetaDone = false;
ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
if (tbMetaDone) {
CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0;
int32_t baseResIdx = 0;
for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
ctgDebug("start to check views in db %s, viewNum %ld", pReq->dbFName, taosArrayGetSize(pReq->pTables));
CTG_ERR_RET(ctgGetViewsFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
baseResIdx += taosArrayGetSize(pReq->pTables);
}
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
if (pCtx->fetchNum <= 0) {
TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
pTask->msgCtxs = taosArrayInit_s(sizeof(SCtgMsgCtx), pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgGetViewInfoFromMnode(pCtg, pConn, pName, NULL, &tReq));
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) {
ctgResetTbMetaTask(pTask);
@ -2470,6 +2610,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
{ctgInitGetTbTagTask, ctgLaunchGetTbTagTask, ctgHandleGetTbTagRsp, ctgDumpTbTagRes, NULL, NULL},
{ctgInitGetViewsTask, ctgLaunchGetViewsTask, ctgHandleGetViewsRsp, ctgDumpViewsRes, NULL, NULL},
};
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
@ -2541,6 +2682,29 @@ _return:
CTG_RET(code);
}
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
SCtgTask* pTask = NULL;
*done = true;
CTG_LOCK(CTG_READ, &pJob->taskLock);
int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) {
pTask = taosArrayGet(pJob->pTasks, i);
if (type != pTask->type) {
continue;
}
if (pTask->status != CTG_TASK_DONE) {
*done = false;
break;
}
}
CTG_UNLOCK(CTG_READ, &pJob->taskLock);
}
SCtgTask* ctgGetTask(SCtgJob* pJob, int32_t taskId) {
int32_t taskNum = taosArrayGetSize(pJob->pTasks);

View File

@ -30,6 +30,7 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
{CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
{CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
{CTG_OP_UPDATE_VIEW_META, "update viewMeta", ctgOpUpdateViewMeta},
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
@ -171,6 +172,18 @@ void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *
}
}
void ctgReleaseViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgViewCache *pCache) {
if (pCache && dbCache) {
CTG_UNLOCK(CTG_READ, &pCache->viewLock);
taosHashRelease(dbCache->viewCache, pCache);
}
if (dbCache) {
ctgReleaseDBCache(pCtg, dbCache);
}
}
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->indexLock);
@ -1266,205 +1279,31 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
mgmt->type = type;
mgmt->metaSize = size;
int32_t ctgUpdateViewMetaEnqueue(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VIEW_META;
op->syncOp = syncOp;
size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
mgmt->slots = taosMemoryCalloc(1, msgSize);
if (NULL == mgmt->slots) {
qError("calloc %d failed", (int32_t)msgSize);
SCtgUpdateViewMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateViewMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateViewMetaMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
mgmt->rentCacheSize = msgSize;
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
return TSDB_CODE_SUCCESS;
}
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
if (NULL == slot->meta) {
qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
char *p = strchr(pRsp->dbFName, '.');
if (p && IS_SYS_DBNAME(p + 1)) {
int32_t len = strlen(p + 1);
memmove(pRsp->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
}
if (NULL == taosArrayPush(slot->meta, meta)) {
qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->pRsp = pRsp;
mgmt->rentCacheSize += size;
slot->needSort = true;
op->data = msg;
qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
__compar_fn_t searchCompare) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
}
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
if (NULL == orig) {
qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
memcpy(orig, meta, size);
qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
if (code) {
qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
widx, mgmt->type);
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
}
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
taosArrayRemove(slot->meta, idx);
mgmt->rentCacheSize -= mgmt->metaSize;
qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
ridx %= mgmt->slotNum;
atomic_store_16(&mgmt->slotRIdx, ridx);
}
SCtgRentSlot *slot = &mgmt->slots[ridx];
int32_t code = 0;
CTG_LOCK(CTG_READ, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
*num = 0;
goto _return;
}
size_t metaNum = taosArrayGetSize(slot->meta);
if (metaNum <= 0) {
qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
*num = 0;
goto _return;
}
size_t msize = metaNum * size;
*res = taosMemoryMalloc(msize);
if (NULL == *res) {
qError("malloc %d failed", (int32_t)msize);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
void *meta = taosArrayGet(slot->meta, 0);
memcpy(*res, meta, msize);
*num = (uint32_t)metaNum;
qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);
_return:
CTG_UNLOCK(CTG_READ, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
while (true) {
int64_t msec = taosGetTimestampMs();
int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
*res = NULL;
*num = 0;
qDebug("too short time period to get expired meta, type:%d", mgmt->type);
return TSDB_CODE_SUCCESS;
}
if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
continue;
}
break;
}
CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));
CTG_ERR_RET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
}
@ -1522,25 +1361,6 @@ _return:
CTG_RET(code);
}
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void *pIter = taosHashIterate(dbCache->stbCache, NULL);
while (pIter) {
uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS ==
ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
}
pIter = taosHashIterate(dbCache->stbCache, pIter);
}
}
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
uint64_t dbId = dbCache->dbId;
@ -1609,30 +1429,6 @@ int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCt
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
SCtgTbCache *pCache) {
SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
if (pCache->pMeta) {
metaRent.sversion = pCache->pMeta->sversion;
metaRent.tversion = pCache->pMeta->tversion;
}
if (pCache->pIndex) {
metaRent.smaVer = pCache->pIndex->version;
}
tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
STableMeta *meta, int32_t metaSize) {
if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
@ -1785,6 +1581,63 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNa
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *viewName, SViewMeta *pMeta) {
if (NULL == dbCache->viewCache) {
ctgWarn("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED);
}
SCtgViewCache *pCache = taosHashGet(dbCache->viewCache, viewName, strlen(viewName));
if (NULL == pCache) {
SCtgViewCache cache = {0};
cache.pMeta = pMeta;
if (taosHashPut(dbCache->viewCache, viewName, strlen(viewName), &cache, sizeof(cache)) != 0) {
ctgError("taosHashPut new tbCache failed, viewName:%s", viewName);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(viewName) + sizeof(SCtgViewCache) + ctgGetViewMetaCacheSize(pMeta));
CTG_DB_NUM_INC(CTG_CI_VIEW);
pMeta = NULL;
ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version);
CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, &cache));
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_WRITE, &pCache->viewLock);
if (pCache->pMeta) {
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pCache->pMeta));
taosMemoryFree(pCache->pMeta->querySql);
taosMemoryFree(pCache->pMeta);
}
pCache->pMeta = pMeta;
CTG_UNLOCK(CTG_WRITE, &pCache->viewLock);
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbIndexCacheSize(pMeta));
pMeta = NULL;
ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version);
CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, pCache));
_return:
if (pMeta) {
taosMemoryFree(pMeta->querySql);
taosMemoryFree(pMeta);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
STableMetaOutput *pOutput = NULL;
int32_t code = 0;
@ -1802,6 +1655,11 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateViewMetaToCache(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncReq) {
CTG_RET(ctgUpdateViewMetaEnqueue(pCtg, pRsp, syncReq));
}
void ctgClearAllHandles(void) {
SCatalog *pCtg = NULL;
@ -2416,6 +2274,46 @@ _return:
CTG_RET(code);
}
int32_t ctgOpUpdateViewMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateViewMetaMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg;
SViewMetaRsp *pRsp = msg->pRsp;
SCtgDBCache *dbCache = NULL;
if (pCtg->stopUpdate) {
goto _return;
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SViewMeta *pMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pMeta->querySql = strdup(pRsp->querySql);
if (NULL == pMeta->querySql) {
taosMemoryFree(pMeta);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pMeta->viewId = pRsp->viewId;
pMeta->version = pRsp->version;
CTG_ERR_JRET(ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta));
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
void ctgClearFreeCache(SCtgCacheOperation *operation) {
SCtgClearCacheMsg *msg = operation->data;
SCatalog *pCtg = msg->pCtg;
@ -2776,6 +2674,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
@ -2995,3 +2894,94 @@ _return:
CTG_RET(code);
}
int32_t ctgGetViewsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgViewsCtx *ctx, int32_t dbIdx,
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
int32_t tbNum = taosArrayGetSize(pList);
SName *pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta *lastTableMeta = NULL;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(flag);
strcpy(dbFName, pName->dbname);
} else {
tNameGetFullDbName(pName, dbFName);
}
SCtgDBCache *dbCache = NULL;
SCtgViewCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArrayPush(ctx->pResList, &(SMetaData){0});
}
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < tbNum; ++i) {
pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->viewCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("view %s not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
CTG_CACHE_NHIT_INC(CTG_CI_VIEW, 1);
continue;
}
CTG_LOCK(CTG_READ, &pCache->viewLock);
if (NULL == pCache->pMeta) {
CTG_UNLOCK(CTG_READ, &pCache->viewLock);
taosHashRelease(dbCache->viewCache, pCache);
ctgDebug("view %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArrayPush(ctx->pResList, &(SMetaRes){0});
CTG_CACHE_NHIT_INC(CTG_CI_VIEW, 1);
continue;
}
CTG_CACHE_HIT_INC(CTG_CI_VIEW, 1);
SMetaRes res = {0};
SViewMeta *pViewMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pViewMeta) {
ctgReleaseViewMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pViewMeta->querySql = strdup(pCache->pMeta->querySql);
if (NULL == pViewMeta->querySql) {
ctgReleaseViewMetaToCache(pCtg, dbCache, pCache);
taosMemoryFree(pViewMeta);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pViewMeta->version = pCache->pMeta->version;
pViewMeta->viewId = pCache->pMeta->viewId;
CTG_UNLOCK(CTG_READ, &pCache->viewLock);
taosHashRelease(dbCache->viewCache, pCache);
ctgDebug("Got view %s meta from cache, dbFName:%s", pName->tname, dbFName);
res.pRes = pViewMeta;
taosArrayPush(ctx->pResList, &res);
}
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}

View File

@ -391,6 +391,8 @@ int32_t ctgdGetClusterCacheNum(SCatalog *pCtg, int32_t type) {
return ctgdGetRentNum(&pCtg->dbRent);
case CTG_DBG_STB_RENT_NUM:
return ctgdGetRentNum(&pCtg->stbRent);
case CTG_DBG_VIEW_RENT_NUM:
return ctgdGetRentNum(&pCtg->viewRent);
default:
break;
}

View File

@ -318,6 +318,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got svr ver from mnode");
break;
}
case TDMT_MND_VIEW_META: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get view-meta, error:%s, viewFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get view-meta rsp failed, error:%s, viewFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got view-meta from mnode, viewFName:%s", target);
break;
}
default:
if (TSDB_CODE_SUCCESS != rspCode) {
qError("Got error rsp, error:%s", tstrerror(rspCode));
@ -1388,3 +1403,60 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pName, SViewMetaOutput* out,
SCtgTaskReq* tReq) {
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_VIEW_META;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
ctgDebug("try to get view info from mnode, viewFName:%s", fullName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build view-meta msg failed, code:%x, viewFName:%s", code, fullName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, fullName));
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
#else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, fullName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}

287
source/libs/catalog/src/ctgRent.c Executable file
View File

@ -0,0 +1,287 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalogInt.h"
#include "query.h"
#include "systable.h"
#include "tname.h"
#include "trpc.h"
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type, int32_t size) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
mgmt->type = type;
mgmt->metaSize = size;
size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
mgmt->slots = taosMemoryCalloc(1, msgSize);
if (NULL == mgmt->slots) {
qError("calloc %d failed", (int32_t)msgSize);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
mgmt->rentCacheSize = msgSize;
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
return TSDB_CODE_SUCCESS;
}
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
if (NULL == slot->meta) {
qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
if (NULL == taosArrayPush(slot->meta, meta)) {
qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
mgmt->rentCacheSize += size;
slot->needSort = true;
qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
__compar_fn_t searchCompare) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
}
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
if (NULL == orig) {
qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
memcpy(orig, meta, size);
qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
if (code) {
qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
widx, mgmt->type);
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
}
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
}
int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
taosArrayRemove(slot->meta, idx);
mgmt->rentCacheSize -= mgmt->metaSize;
qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
ridx %= mgmt->slotNum;
atomic_store_16(&mgmt->slotRIdx, ridx);
}
SCtgRentSlot *slot = &mgmt->slots[ridx];
int32_t code = 0;
CTG_LOCK(CTG_READ, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
*num = 0;
goto _return;
}
size_t metaNum = taosArrayGetSize(slot->meta);
if (metaNum <= 0) {
qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
*num = 0;
goto _return;
}
size_t msize = metaNum * size;
*res = taosMemoryMalloc(msize);
if (NULL == *res) {
qError("malloc %d failed", (int32_t)msize);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
void *meta = taosArrayGet(slot->meta, 0);
memcpy(*res, meta, msize);
*num = (uint32_t)metaNum;
qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);
_return:
CTG_UNLOCK(CTG_READ, &slot->lock);
CTG_RET(code);
}
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
while (true) {
int64_t msec = taosGetTimestampMs();
int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
*res = NULL;
*num = 0;
qDebug("too short time period to get expired meta, type:%d", mgmt->type);
return TSDB_CODE_SUCCESS;
}
if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
continue;
}
break;
}
CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));
return TSDB_CODE_SUCCESS;
}
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void *pIter = taosHashIterate(dbCache->stbCache, NULL);
while (pIter) {
uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS ==
ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
}
pIter = taosHashIterate(dbCache->stbCache, pIter);
}
}
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
SCtgTbCache *pCache) {
SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
if (pCache->pMeta) {
metaRent.sversion = pCache->pMeta->sversion;
metaRent.tversion = pCache->pMeta->tversion;
}
if (pCache->pIndex) {
metaRent.smaVer = pCache->pIndex->version;
}
tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, uint64_t dbId, uint64_t viewId,
SCtgViewCache *pCache) {
SViewVersion metaRent = {.dbId = dbId, .viewId = viewId};
metaRent.version = pCache->pMeta->version;
tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
tstrncpy(metaRent.viewName, viewName, sizeof(metaRent.viewName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->viewRent, &metaRent, metaRent.viewId, sizeof(SViewVersion),
ctgViewVersionSortCompare, ctgViewVersionSearchCompare));
ctgDebug("db %s,0x%" PRIx64 " view %s,0x%" PRIx64 " version %d updated to viewRent", dbFName, dbId, viewName, viewId, metaRent.version);
return TSDB_CODE_SUCCESS;
}

View File

@ -93,6 +93,10 @@ char* ctgTaskTypeStr(CTG_TASK_TYPE type) {
return "[bget table meta]";
case CTG_TASK_GET_TB_HASH_BATCH:
return "[bget table hash]";
case CTG_TASK_GET_TB_TAG:
return "[get table tag]";
case CTG_TASK_GET_VIEW:
return "[get view]";
default:
return "unknown";
}
@ -304,6 +308,7 @@ void ctgFreeInstUserCache(SHashObj* pUserCache) {
void ctgFreeHandleImpl(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
ctgFreeMetaRent(&pCtg->viewRent);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
@ -320,6 +325,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
ctgFreeMetaRent(&pCtg->viewRent);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
@ -405,12 +411,14 @@ void ctgClearHandle(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
ctgFreeMetaRent(&pCtg->viewRent);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB, sizeof(SDbCacheInfo));
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE, sizeof(SSTableVersion));
ctgMetaRentInit(&pCtg->viewRent, gCtgMgmt.cfg.viewRentSec, CTG_RENT_VIEW, sizeof(SViewVersion));
pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false,
HASH_ENTRY_LOCK);
@ -513,6 +521,12 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
taosMemoryFreeClear(pCtx->out);
break;
}
case TDMT_MND_VIEW_META: {
SViewMetaRsp* pOut = (SViewMetaRsp*)pCtx->out;
taosMemoryFree(pOut->querySql);
taosMemoryFreeClear(pCtx->out);
break;
}
default:
qError("invalid reqType %d", pCtx->reqType);
break;
@ -1202,6 +1216,17 @@ int32_t ctgDbCacheInfoSearchCompare(const void* key1, const void* key2) {
}
}
int32_t ctgViewVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t*)key1 < ((SViewVersion*)key2)->viewId) {
return -1;
} else if (*(uint64_t*)key1 > ((SViewVersion*)key2)->viewId) {
return 1;
} else {
return 0;
}
}
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
return -1;
@ -1222,6 +1247,17 @@ int32_t ctgDbCacheInfoSortCompare(const void* key1, const void* key2) {
}
}
int32_t ctgViewVersionSortCompare(const void* key1, const void* key2) {
if (((SViewVersion*)key1)->viewId < ((SViewVersion*)key2)->viewId) {
return -1;
} else if (((SViewVersion*)key1)->viewId > ((SViewVersion*)key2)->viewId) {
return 1;
} else {
return 0;
}
}
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
if (NULL == dbInfo) {
return TSDB_CODE_SUCCESS;
@ -1758,6 +1794,15 @@ uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) {
return sizeof(*pIndex) + pIndex->indexSize;
}
uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta) {
if (NULL == pMeta) {
return 0;
}
return sizeof(*pMeta) + strlen(pMeta->querySql) + 1;
}
FORCE_INLINE uint64_t ctgGetTbMetaCacheSize(STableMeta *pMeta) {
if (NULL == pMeta) {
return 0;
@ -1870,6 +1915,7 @@ uint64_t ctgGetClusterCacheSize(SCatalog *pCtg) {
cacheSize += pCtg->dbRent.rentCacheSize;
cacheSize += pCtg->stbRent.rentCacheSize;
cacheSize += pCtg->viewRent.rentCacheSize;
return cacheSize;
}
@ -1958,3 +2004,20 @@ void ctgGetGlobalCacheSize(uint64_t *pSize) {
}
}
int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx) {
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
int32_t viewNum = taosArrayGetSize(pReq->pTables);
ctgDebug("start to check views in db %s, viewNum %d", pReq->dbFName, viewNum);
for (int32_t m = 0; m < viewNum; ++m) {
taosArrayPush(pCtx->pResList, &(SMetaData){0});
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -658,26 +658,17 @@ static int32_t collectMetaKeyFromGrant(SCollectMetaKeyCxt* pCxt, SGrantStmt* pSt
static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreateViewStmt* pStmt) {
int32_t code =
reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE,
pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
int32_t code =
reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE,
int32_t code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE,
pCxt->pMetaCache);
}
return code;
}

View File

@ -7267,6 +7267,13 @@ static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStm
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type");
}
STableMeta* pMetaCache = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->viewName, &pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
taosMemoryFreeClear(pMetaCache);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table");
}
return TSDB_CODE_SUCCESS;
}

View File

@ -691,9 +691,11 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReq(pMetaCache->pTableCfg, &pCatalogReq->pTableCfg);
}
#ifdef TD_ENTERPRISE
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReqFromDb(pMetaCache->pViews, &pCatalogReq->pView);
code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pView);
}
#endif
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
return code;
}
@ -886,14 +888,6 @@ int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCac
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta);
}
int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pViews);
}
int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pViews);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);

View File

@ -286,6 +286,24 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetViewMetaMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SViewMetaReq req = {0};
strncpy(req.fullname, input, sizeof(req.fullname) - 1);
int32_t bufLen = tSerializeSViewMetaReq(NULL, 0, &req);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSViewMetaReq(pBuf, bufLen, &req);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output;
SUseDbRsp usedbRsp = {0};
@ -637,6 +655,25 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SViewMetaRsp *out = taosMemoryCalloc(1, sizeof(SViewMetaRsp));
if (tDeserializeSViewMetaRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSViewMetaRsp failed, msgSize:%d", msgSize);
tFreeSViewMetaRsp(out);
taosMemoryFree(out);
return TSDB_CODE_INVALID_MSG;
}
*(SViewMetaRsp **)output = out;
return TSDB_CODE_SUCCESS;
}
void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
@ -651,6 +688,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
@ -665,6 +703,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp;
}
#pragma GCC diagnostic pop

View File

@ -588,6 +588,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal err
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table")
//planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")