diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb843ced91..28fa968eed 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -141,6 +141,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_APPS, TSDB_MGMT_TABLE_STREAM_TASKS, TSDB_MGMT_TABLE_PRIVILEGES, + TSDB_MGMT_TABLE_VIEWS, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -251,6 +252,7 @@ typedef enum ENodeType { QUERY_NODE_CASE_WHEN, QUERY_NODE_EVENT_WINDOW, QUERY_NODE_HINT, + QUERY_NODE_VIEW, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, @@ -354,7 +356,9 @@ typedef enum ENodeType { QUERY_NODE_RESTORE_MNODE_STMT, QUERY_NODE_RESTORE_VNODE_STMT, QUERY_NODE_PAUSE_STREAM_STMT, - QUERY_NODE_RESUME_STREAM_STMT, + QUERY_NODE_RESUME_STREAM_STMT, + QUERY_NODE_CREATE_VIEW_STMT, + QUERY_NODE_DROP_VIEW_STMT, // logic plan node QUERY_NODE_LOGIC_PLAN_SCAN = 1000, @@ -3847,6 +3851,32 @@ typedef struct { }; } SPackedData; +typedef struct { + char name[TSDB_VIEW_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + int8_t orReplace; + int8_t precision; + int32_t numOfCols; + SSchema* pSchema; +} SCMCreateViewReq; + +typedef struct { + int64_t streamId; +} SCMCreateViewRsp; + +int32_t tSerializeSCMCreateViewReq(void* buf, int32_t bufLen, const SCMCreateViewReq* pReq); +int32_t tDeserializeSCMCreateViewReq(void* buf, int32_t bufLen, SCMCreateViewReq* pReq); +void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq); + + +typedef struct { + char dbFName[TSDB_DB_FNAME_LEN]; + char viewName[TSDB_VIEW_NAME_LEN]; + int8_t igNotExists; +} SCMDropViewReq; + + + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fb2c780724..f9fdfb90c1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,6 +187,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VIEW, "create-view", SCMCreateViewReq, SCMCreateViewRsp) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 7a7a13b285..8f5679c846 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -51,6 +51,7 @@ typedef enum { typedef struct SUserAuthInfo { char user[TSDB_USER_LEN]; SName tbName; + bool isView; AUTH_TYPE type; } SUserAuthInfo; @@ -83,6 +84,7 @@ typedef struct SCatalogReq { SArray* pTableIndex; // element is SNAME SArray* pTableCfg; // element is SNAME SArray* pTableTag; // element is SNAME + SArray* pView; // element is STablesReq bool qNodeRequired; // valid qnode bool dNodeRequired; // valid dnode bool svrVerRequired; @@ -109,6 +111,7 @@ typedef struct SMetaData { SArray* pTableCfg; // pRes = STableCfg* SArray* pTableTag; // pRes = SArray* SArray* pDnodeList; // pRes = SArray* + SArray* pView; // pRes = SViewInfo* SMetaRes* pSvrVer; // pRes = char* } SMetaData; diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 40b9d21503..c81835f3b2 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -488,6 +488,27 @@ typedef struct SDropFunctionStmt { bool ignoreNotExists; } SDropFunctionStmt; +typedef struct SQueryResInfo { + +} SQueryResInfo; + +typedef struct SCreateViewStmt { + ENodeType type; + char dbName[TSDB_DB_NAME_LEN]; + char viewName[TSDB_VIEW_NAME_LEN]; + bool orReplace; + SNodeList* pCols; + SNode* pQuery; + SCMCreateViewReq createReq; +} SCreateViewStmt; + +typedef struct SDropViewStmt { + ENodeType type; + char dbName[TSDB_DB_NAME_LEN]; + char viewName[TSDB_VIEW_NAME_LEN]; + bool ignoreNotExists; +} SDropViewStmt; + typedef struct SGrantStmt { ENodeType type; char userName[TSDB_USER_LEN]; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index d7d45c57ad..86ac5d8a10 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -180,6 +180,16 @@ typedef struct STempTableNode { SNode* pSubquery; } STempTableNode; +typedef struct SViewNode { + STableNode table; // QUERY_NODE_REAL_TABLE + struct STableMeta* pMeta; + SVgroupsInfo* pVgroupList; + char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES + double ratio; + SArray* pSmaIndexes; + int8_t cacheLastMode; +} SViewNode; + typedef enum EJoinType { JOIN_TYPE_INNER = 1, JOIN_TYPE_LEFT, diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 2dc5c8f112..66fdd6f66c 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -33,6 +33,8 @@ typedef struct SStmtCallback { int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); } SStmtCallback; +typedef int32_t (*validateSqlFn)(void* param, SQuery* pQuery, const char* sql, SQueryResInfo* pRes); + typedef struct SParseCsvCxt { TdFilePtr fp; // last parsed file int32_t tableNo; // last parsed table @@ -64,6 +66,8 @@ typedef struct SParseContext { SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos int64_t allocatorId; + validateSqlFn validateSqlFp; + void* validateSqlParam; } SParseContext; int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b64c015284..bc5e1fcded 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -711,6 +711,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2669) #define TSDB_CODE_PAR_INVALID_VARBINARY TAOS_DEF_ERROR_CODE(0, 0x266A) #define TSDB_CODE_PAR_INVALID_IP_RANGE TAOS_DEF_ERROR_CODE(0, 0x266B) +#define TSDB_CODE_PAR_INVALID_VIEW_QUERY TAOS_DEF_ERROR_CODE(0, 0x266C) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tdef.h b/include/util/tdef.h index 3bfa136d3e..e58154c399 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -230,6 +230,9 @@ typedef enum ELogicConditionType { #define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb +#define TSDB_VIEW_NAME_LEN 193 +#define TSDB_VIEW_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_VIEW_NAME_LEN + TSDB_NAME_DELIMITER_LEN) + #define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_TB_COMMENT_LEN 1025 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c78ba4c4a0..db219c5c17 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2591,3 +2591,48 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param schedulerFetchRows(pRequest->body.queryJob, &req); } + +int32_t clientValidateSql(void* param, SQuery* pQuery, const char* sql, SCMCreateViewReq* pReq) { + SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; + SSyncQueryParam* syncParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); + tsem_init(&syncParam->sem, 0, 0); + + SRequestObj* pRequest = pWrapper->pRequest; + SRequestObj* pNewRequest = NULL; + int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), syncParam, true, &pNewRequest, 0); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; + } + + pNewRequest->pQuery = pQuery; + pNewRequest->body.queryFp = syncQueryFn; + doAsyncQuery(pNewRequest, false); + + tsem_wait(&syncParam->sem); + + code = pNewRequest->code; + pRequest->code = code; + + if (TSDB_CODE_SUCCESS == code) { + pReq->numOfCols = pQuery->numOfResCols; + pReq->precision = pQuery->precision; + pReq->pSchema = taosMemoryMalloc(pQuery->numOfResCols * sizeof(SSchema)); + if (NULL == pReq->pSchema) { + code = terrno = TSDB_CODE_OUT_OF_MEMORY; + } else { + memcpy(pReq->pSchema, pQuery->pResSchema, pQuery->numOfResCols * sizeof(SSchema)); + } + } else if (0 != pNewRequest->msgBuf[0]) { + strncpy(pRequest->msgBuf, pNewRequest->msgBuf, pRequest->msgBufLen - 1); + pRequest->msgBuf[pRequest->msgBufLen - 1] = 0; + } + + freeQueryParam(syncParam); + destroyRequest(pNewRequest); + + return code; +} + + + diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ac7a6e6646..3a6bdde408 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1026,7 +1026,7 @@ void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, taosAsyncQueryImplWithReqid(connId, sql, fp, param, false, reqid); } -int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { +int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper) { const STscObj *pTscObj = pRequest->pTscObj; *pCxt = taosMemoryCalloc(1, sizeof(SParseContext)); @@ -1051,7 +1051,9 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { .async = true, .svrVer = pTscObj->sVer, .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes), - .allocatorId = pRequest->allocatorRefId}; + .allocatorId = pRequest->allocatorRefId, + .validateSqlFp = clientValidateSql, + .validateSqlParam = pWrapper}; return TSDB_CODE_SUCCESS; } @@ -1068,7 +1070,7 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p } if (TSDB_CODE_SUCCESS == code) { - code = createParseContext(pRequest, &pWrapper->pParseCtx); + code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0553f73bb3..998ac3717a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8215,3 +8215,65 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); } + +int32_t tSerializeSCMCreateViewReq(void *buf, int32_t bufLen, const SCMCreateViewReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1; + if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1; + if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfCols) < 0) return -1; + for (int32_t i = 0; i < pReq->numOfCols; ++i) { + SSchema *pSchema = &pReq->pSchema[i]; + if (tEncodeSSchema(&encoder, pSchema) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfCols) < 0) return -1; + + if (pReq->numOfCols > 0) { + pReq->pSchema = taosArrayInit(pReq->numOfCols, sizeof(SSchema)); + if (pReq->pSchema == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < pReq->numOfCols; ++i) { + SSchema* pSchema = pReq->pSchema + i; + if (tDecodeSSchema(&decoder, pSchema) < 0) return -1; + } + } + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq) { + if (NULL == pReq) { + return; + } + + taosMemoryFree(pReq->pSchema); +} + + diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 4c43326959..7a40370fbc 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 90d54e7f58..299197a690 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -725,6 +725,62 @@ void tFreeStreamObj(SStreamObj* pObj); // SArray* childInfo; // SArray // } SStreamCheckpointObj; + +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + // ctl + SRWLatch lock; + // create info + int64_t createTime; + int64_t updateTime; + int32_t version; + int32_t totalLevel; + int64_t smaId; // 0 for unused + // info + int64_t uid; + int8_t status; + SStreamConf conf; + // source and target + int64_t sourceDbUid; + int64_t targetDbUid; + char sourceDb[TSDB_DB_FNAME_LEN]; + char targetDb[TSDB_DB_FNAME_LEN]; + char targetSTbName[TSDB_TABLE_FNAME_LEN]; + int64_t targetStbUid; + + // fixedSinkVg is not applicable for encode and decode + SVgObj fixedSinkVg; + int32_t fixedSinkVgId; // 0 for shuffle + + // transformation + char* sql; + char* ast; + char* physicalPlan; + SArray* tasks; // SArray> + + SArray* pHTasksList; // generate the results for already stored ts data + int64_t hTaskUid; // stream task for history ts data + + SSchemaWrapper outputSchema; + SSchemaWrapper tagSchema; + + // 3.0.20 + int64_t checkpointFreq; // ms + int64_t currentTick; // do not serialize + int64_t deleteMark; + int8_t igCheckUpdate; + + // 3.0.5. + int64_t checkpointId; + char reserve[256]; + +} SViewObj; + +int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj); +int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver); +void tFreeSViewObj(SViewObj* pObj); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndView.h b/source/dnode/mnode/impl/inc/mndView.h new file mode 100755 index 0000000000..19fd2a3fd4 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndView.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_STREAM_H_ +#define _TD_MND_STREAM_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitStream(SMnode *pMnode); +void mndCleanupStream(SMnode *pMnode); + +SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); +void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); + +SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); + +int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); + +// for sma +// TODO refactor +int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); + +int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_STREAM_H_*/ diff --git a/source/dnode/mnode/impl/src/mndView.c b/source/dnode/mnode/impl/src/mndView.c new file mode 100755 index 0000000000..1e58b9f3a9 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndView.c @@ -0,0 +1,797 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndView.h" + +int32_t mndInitView(SMnode *pMnode) { + SSdbTable table = { + .sdbType = SDB_VIEW, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndViewActionEncode, + .decodeFp = (SdbDecodeFp)mndViewActionDecode, + .insertFp = (SdbInsertFp)mndViewActionInsert, + .updateFp = (SdbUpdateFp)mndViewActionUpdate, + .deleteFp = (SdbDeleteFp)mndViewActionDelete, + }; + + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveStream); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextStream); + + taosThreadMutexInit(&execNodeList.lock, NULL); + execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupView(SMnode *pMnode) { + taosArrayDestroy(execNodeList.pTaskList); + taosHashCleanup(execNodeList.pTaskMap); + taosThreadMutexDestroy(&execNodeList.lock); + mDebug("mnd view cleanup"); +} + +int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SViewObj *pObj) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; + + if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; + + if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; + + if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1; + + if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; + + if (pObj->sql != NULL) { + if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; + } else { + if (tEncodeCStr(pEncoder, "") < 0) return -1; + } + + if (pObj->ast != NULL) { + if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1; + } else { + if (tEncodeCStr(pEncoder, "") < 0) return -1; + } + + if (pObj->physicalPlan != NULL) { + if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; + } else { + if (tEncodeCStr(pEncoder, "") < 0) return -1; + } + + int32_t sz = taosArrayGetSize(pObj->tasks); + if (tEncodeI32(pEncoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SArray *pArray = taosArrayGetP(pObj->tasks, i); + int32_t innerSz = taosArrayGetSize(pArray); + if (tEncodeI32(pEncoder, innerSz) < 0) return -1; + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask *pTask = taosArrayGetP(pArray, j); + pTask->ver = SSTREAM_TASK_VER; + if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; + } + } + + if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; + + // 3.0.20 ver =2 + if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1; + + // 3.0.50 ver = 3 + if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; + + if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1; + + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSStreamObj(SDecoder *pDecoder, SViewObj *pObj, int32_t sver) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; + + if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; + + if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; + + if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1; + + if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; + + if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; + + pObj->tasks = NULL; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + if (sz != 0) { + pObj->tasks = taosArrayInit(sz, sizeof(void *)); + for (int32_t i = 0; i < sz; i++) { + int32_t innerSz; + if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; + SArray *pArray = taosArrayInit(innerSz, sizeof(void *)); + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + taosArrayDestroy(pArray); + return -1; + } + if (tDecodeStreamTask(pDecoder, pTask) < 0) { + taosMemoryFree(pTask); + taosArrayDestroy(pArray); + return -1; + } + taosArrayPush(pArray, &pTask); + } + taosArrayPush(pObj->tasks, &pArray); + } + } + + if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; + + // 3.0.20 + if (sver >= 2) { + if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1; + if (!tDecodeIsEnd(pDecoder)) { + if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1; + } + } + if (sver >= 3) { + if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1; + } + if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + +void tFreeStreamObj(SViewObj *pStream) { + taosMemoryFree(pStream->sql); + taosMemoryFree(pStream->ast); + taosMemoryFree(pStream->physicalPlan); + + if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) { + taosMemoryFree(pStream->outputSchema.pSchema); + } + + pStream->tasks = freeStreamTasks(pStream->tasks); + pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList); + + if (pStream->tagSchema.nCols > 0) { + taosMemoryFree(pStream->tagSchema.pSchema); + } +} + + +SSdbRaw *mndViewActionEncode(SViewObj *pStream) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + + SEncoder encoder; + tEncoderInit(&encoder, NULL, 0); + if (tEncodeSViewObj(&encoder, pStream) < 0) { + tEncoderClear(&encoder); + goto STREAM_ENCODE_OVER; + } + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); + if (pRaw == NULL) goto STREAM_ENCODE_OVER; + + buf = taosMemoryMalloc(tlen); + if (buf == NULL) goto STREAM_ENCODE_OVER; + + tEncoderInit(&encoder, buf, tlen); + if (tEncodeSStreamObj(&encoder, pStream) < 0) { + tEncoderClear(&encoder); + goto STREAM_ENCODE_OVER; + } + tEncoderClear(&encoder); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +STREAM_ENCODE_OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream); + return pRaw; +} + +SSdbRow *mndViewActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRow *pRow = NULL; + SViewObj *pStream = NULL; + void *buf = NULL; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) { + goto STREAM_DECODE_OVER; + } + + if (sver != MND_STREAM_VER_NUMBER) { + terrno = 0; + mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); + goto STREAM_DECODE_OVER; + } + + pRow = sdbAllocRow(sizeof(SViewObj)); + if (pRow == NULL) goto STREAM_DECODE_OVER; + + pStream = sdbGetRowObj(pRow); + if (pStream == NULL) goto STREAM_DECODE_OVER; + + int32_t tlen; + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER); + buf = taosMemoryMalloc(tlen + 1); + if (buf == NULL) goto STREAM_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); + + SDecoder decoder; + tDecoderInit(&decoder, buf, tlen + 1); + if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) { + tDecoderClear(&decoder); + goto STREAM_DECODE_OVER; + } + tDecoderClear(&decoder); + + terrno = TSDB_CODE_SUCCESS; + +STREAM_DECODE_OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, + terrstr()); + taosMemoryFreeClear(pRow); + return NULL; + } + + mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream); + return pRow; +} + +static int32_t mndViewActionInsert(SSdb *pSdb, SViewObj *pView) { + mTrace("view:%s, perform insert action", pView->name); + return 0; +} + +static int32_t mndViewActionDelete(SSdb *pSdb, SViewObj *pStream) { + mTrace("stream:%s, perform delete action", pStream->name); + taosWLockLatch(&pStream->lock); + tFreeStreamObj(pStream); + taosWUnLockLatch(&pStream->lock); + return 0; +} + +static int32_t mndViewActionUpdate(SSdb *pSdb, SViewObj *pOldStream, SViewObj *pNewStream) { + mTrace("stream:%s, perform update action", pOldStream->name); + + atomic_exchange_32(&pOldStream->version, pNewStream->version); + + taosWLockLatch(&pOldStream->lock); + + pOldStream->status = pNewStream->status; + pOldStream->updateTime = pNewStream->updateTime; + + taosWUnLockLatch(&pOldStream->lock); + return 0; +} + +SViewObj *mndAcquireView(SMnode *pMnode, char *viewName) { + SSdb *pSdb = pMnode->pSdb; + SViewObj *pStream = sdbAcquire(pSdb, SDB_VIEW, viewName); + if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + } + return pStream; +} + +void mndReleaseStream(SMnode *pMnode, SViewObj *pStream) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pStream); +} + +static int32_t mndBuildViewObj(SMnode *pMnode, SViewObj *pObj, SCMCreateStreamReq *pCreate) { + SNode *pAst = NULL; + SQueryPlan *pPlan = NULL; + + mInfo("stream:%s to create", pCreate->name); + memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); + pObj->createTime = taosGetTimestampMs(); + pObj->updateTime = pObj->createTime; + pObj->version = 1; + pObj->smaId = 0; + + pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); + + char p[TSDB_STREAM_FNAME_LEN + 32] = {0}; + snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory"); + + pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name)); + pObj->status = 0; + + pObj->conf.igExpired = pCreate->igExpired; + pObj->conf.trigger = pCreate->triggerType; + pObj->conf.triggerParam = pCreate->maxDelay; + pObj->conf.watermark = pCreate->watermark; + pObj->conf.fillHistory = pCreate->fillHistory; + pObj->deleteMark = pCreate->deleteMark; + pObj->igCheckUpdate = pCreate->igUpdate; + + memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); + SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); + if (pSourceDb == NULL) { + mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); + return -1; + } + pObj->sourceDbUid = pSourceDb->uid; + mndReleaseDb(pMnode, pSourceDb); + + memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); + + SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); + if (pTargetDb == NULL) { + mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); + return -1; + } + tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); + + if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { + pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN); + } else { + pObj->targetStbUid = pCreate->targetStbUid; + } + pObj->targetDbUid = pTargetDb->uid; + mndReleaseDb(pMnode, pTargetDb); + + pObj->sql = pCreate->sql; + pObj->ast = pCreate->ast; + + pCreate->sql = NULL; + pCreate->ast = NULL; + + // deserialize ast + if (nodesStringToNode(pObj->ast, &pAst) < 0) { + goto FAIL; + } + + // extract output schema from ast + if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) { + goto FAIL; + } + + int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols); + if (numOfNULL > 0) { + pObj->outputSchema.nCols += numOfNULL; + SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); + if (!pFullSchema) { + goto FAIL; + } + + int32_t nullIndex = 0; + int32_t dataIndex = 0; + for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { + SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); + if (nullIndex >= numOfNULL || i < pos->slotId) { + pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; + pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; + pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; + strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); + pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; + dataIndex++; + } else { + pFullSchema[i].bytes = 0; + pFullSchema[i].colId = pos->colId; + pFullSchema[i].flags = COL_SET_NULL; + memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); + pFullSchema[i].type = pos->type; + nullIndex++; + } + } + taosMemoryFree(pObj->outputSchema.pSchema); + pObj->outputSchema.pSchema = pFullSchema; + } + + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + .triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, + .watermark = pObj->conf.watermark, + .igExpired = pObj->conf.igExpired, + .deleteMark = pObj->deleteMark, + .igCheckUpdate = pObj->igCheckUpdate, + }; + + // using ast and param to build physical plan + if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + goto FAIL; + } + + // save physcial plan + if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { + goto FAIL; + } + + pObj->tagSchema.nCols = pCreate->numOfTags; + if (pCreate->numOfTags) { + pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); + } + /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ + for (int32_t i = 0; i < pCreate->numOfTags; i++) { + SField *pField = taosArrayGet(pCreate->pTags, i); + pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; + pObj->tagSchema.pSchema[i].bytes = pField->bytes; + pObj->tagSchema.pSchema[i].flags = pField->flags; + pObj->tagSchema.pSchema[i].type = pField->type; + memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN); + } + +FAIL: + if (pAst != NULL) nodesDestroyNode(pAst); + if (pPlan != NULL) qDestroyQueryPlan(pPlan); + return 0; +} + + +static int32_t mndCreateView(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SRpcMsg *pReq) { + SUserObj userObj = {0}; + taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass); + tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN); + tstrncpy(userObj.acct, acct, TSDB_USER_LEN); + userObj.createdTime = taosGetTimestampMs(); + userObj.updateTime = userObj.createdTime; + userObj.superUser = 0; // pCreate->superUser; + userObj.sysInfo = pCreate->sysInfo; + userObj.enable = pCreate->enable; + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-user"); + if (pTrans == NULL) { + mError("user:%s, failed to create since %s", pCreate->user, terrstr()); + return -1; + } + mInfo("trans:%d, used to create user:%s", pTrans->id, pCreate->user); + + SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessCreateViewReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SViewObj *pStream = NULL; + SDbObj *pDb = NULL; + SCMCreateViewReq createViewReq = {0}; + SViewObj streamObj = {0}; + + if (tDeserializeSCMCreateViewReq(pReq->pCont, pReq->contLen, &createViewReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mInfo("start to create view:%s, sql:%s", createViewReq.name, createViewReq.sql); + + pStream = mndAcquireStream(pMnode, createStreamReq.name); + if (pStream != NULL) { + if (createStreamReq.igExists) { + mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); + code = 0; + goto _OVER; + } else { + terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; + goto _OVER; + } + } else if (terrno != TSDB_CODE_MND_STREAM_NOT_EXIST) { + goto _OVER; + } + + if (mndCreateView(pMnode, &streamObj, &createStreamReq) < 0) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + goto _OVER; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); + if (pTrans == NULL) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + goto _OVER; + } + + mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); + + mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mndTransDrop(pTrans); + goto _OVER; + } + + if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb) != 0) { + mndTransDrop(pTrans); + goto _OVER; + } + + if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb) != 0) { + mndTransDrop(pTrans); + goto _OVER; + } + + // execute creation + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + goto _OVER; + } + + mndTransDrop(pTrans); + + taosThreadMutexLock(&execNodeList.lock); + mDebug("register to stream task node list"); + keepStreamTasksInBuf(&streamObj, &execNodeList); + taosThreadMutexUnlock(&execNodeList.lock); + + code = TSDB_CODE_ACTION_IN_PROGRESS; + + char detail[2000] = {0}; + sprintf(detail, + "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 + ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64 + ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, + createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, + createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, + createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, + createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark); + + SName name = {0}; + tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB); + //reuse this function for stream + + auditRecord(pReq, pMnode->clusterId, "createView", name.dbname, "", detail); + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + } + + mndReleaseStream(pMnode, pStream); + + tFreeSCMCreateStreamReq(&createStreamReq); + tFreeStreamObj(&streamObj); + return code; +} + +static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SViewObj *pStream = NULL; + + SMDropStreamReq dropReq = {0}; + if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + pStream = mndAcquireStream(pMnode, dropReq.name); + + if (pStream == NULL) { + if (dropReq.igNotExists) { + mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name); + sdbRelease(pMnode->pSdb, pStream); + return 0; + } else { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + return -1; + } + } + + if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); + if (pTrans == NULL) { + mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); + + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + // mndTransSetSerial(pTrans); + + // drop all tasks + if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + // drop stream + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + char detail[100] = {0}; + sprintf(detail, "igNotExists:%d", dropReq.igNotExists); + + SName name = {0}; + tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); + //reuse this function for stream + + auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + +static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SViewObj *pStream = NULL; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + if (pShow->pIter == NULL) break; + + SColumnInfoData *pColInfo; + SName n; + int32_t cols = 0; + + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); + + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); + + char status[20 + VARSTR_HEADER_SIZE] = {0}; + char status2[20] = {0}; + mndShowStreamStatus(status2, pStream); + STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); + + char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false); + + char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false); + + if (pStream->targetSTbName[0] == 0) { + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, NULL, true); + } else { + char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false); + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false); + + char trigger[20 + VARSTR_HEADER_SIZE] = {0}; + char trigger2[20] = {0}; + mndShowStreamTrigger(trigger2, pStream); + STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + + numOfRows++; + sdbRelease(pSdb, pStream); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} + + + diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 695373d220..ddde645fae 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -148,7 +148,8 @@ typedef enum { SDB_DB = 19, SDB_FUNC = 20, SDB_IDX = 21, - SDB_MAX = 22 + SDB_VIEW = 22, + SDB_MAX = 23 } ESdbType; typedef struct SSdbRaw { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c72b03817b..ff3f56425b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -269,6 +269,10 @@ const char* nodesNodeName(ENodeType type) { return "RestoreMnodeStmt"; case QUERY_NODE_RESTORE_VNODE_STMT: return "RestoreVnodeStmt"; + case QUERY_NODE_CREATE_VIEW_STMT: + return "CreateViewStmt"; + case QUERY_NODE_DROP_VIEW_STMT: + return "DropViewStmt"; case QUERY_NODE_LOGIC_PLAN_SCAN: return "LogicScan"; case QUERY_NODE_LOGIC_PLAN_JOIN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index a5b9f6dd91..d28ba84a6a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -304,6 +304,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SEventWindowNode)); case QUERY_NODE_HINT: return makeNode(type, sizeof(SHintNode)); + case QUERY_NODE_VIEW: + return makeNode(type, sizeof(SViewNode)); case QUERY_NODE_SET_OPERATOR: return makeNode(type, sizeof(SSetOperator)); case QUERY_NODE_SELECT_STMT: @@ -467,6 +469,10 @@ SNode* nodesMakeNode(ENodeType type) { case QUERY_NODE_RESTORE_MNODE_STMT: case QUERY_NODE_RESTORE_VNODE_STMT: return makeNode(type, sizeof(SRestoreComponentNodeStmt)); + case QUERY_NODE_CREATE_VIEW_STMT: + return makeNode(type, sizeof(SCreateViewStmt)); + case QUERY_NODE_DROP_VIEW_STMT: + return makeNode(type, sizeof(SDropViewStmt)); case QUERY_NODE_LOGIC_PLAN_SCAN: return makeNode(type, sizeof(SScanLogicNode)); case QUERY_NODE_LOGIC_PLAN_JOIN: @@ -832,6 +838,13 @@ void nodesDestroyNode(SNode* pNode) { destroyHintValue(pHint->option, pHint->value); break; } + case QUERY_NODE_VIEW: { + SViewNode* pView = (SViewNode*)pNode; + taosMemoryFreeClear(pView->pMeta); + taosMemoryFreeClear(pView->pVgroupList); + taosArrayDestroyEx(pView->pSmaIndexes, destroySmaIndex); + break; + } case QUERY_NODE_SET_OPERATOR: { SSetOperator* pStmt = (SSetOperator*)pNode; nodesDestroyList(pStmt->pProjectionList); @@ -1115,6 +1128,13 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_RESTORE_MNODE_STMT: // no pointer field case QUERY_NODE_RESTORE_VNODE_STMT: // no pointer field break; + case QUERY_NODE_CREATE_VIEW_STMT: { + SCreateViewStmt* pStmt = (SCreateViewStmt*)pNode; + nodesDestroyNode(pStmt->pQuery); + break; + } + case QUERY_NODE_DROP_VIEW_STMT: + break; case QUERY_NODE_LOGIC_PLAN_SCAN: { SScanLogicNode* pLogicNode = (SScanLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 719e7ba08c..7ebdc496fd 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -118,6 +118,7 @@ SNode* createNodeListNodeEx(SAstCreateContext* pCxt, SNode* p1, SNode* p2); SNode* createRealTableNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pTableName, SToken* pTableAlias); SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const SToken* pTableAlias); SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft, SNode* pRight, SNode* pJoinCond); +SNode* createViewNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pViewName); SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset); SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder); SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap); diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 41cf45906f..c963984f85 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -69,6 +69,7 @@ typedef struct SParseMetaCache { SHashObj* pUdf; // key is funcName, element is SFuncInfo* SHashObj* pTableIndex; // key is tbFName, element is SArray* SHashObj* pTableCfg; // key is tbFName, element is STableCfg* + SHashObj* pViews; // key is viewFName, element is SViewInfo* SArray* pDnodes; // element is SEpSet bool dnodeRequired; } SParseMetaCache; @@ -93,6 +94,8 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); +int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pView, SParseMetaCache* pMetaCache); +int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 6e2521e5b9..f6d949176c 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -622,6 +622,14 @@ language_opt(A) ::= LANGUAGE NK_STRING(B). or_replace_opt(A) ::= . { A = false; } or_replace_opt(A) ::= OR REPLACE. { A = true; } +/************************************************ create/drop view **************************************************/ +cmd ::= CREATE or_replace_opt(A) VIEW full_view_name(B) col_list_opt(C) AS query_or_subquery(D). + { pCxt->pRootNode = createCreateViewStmt(pCxt, A, B, C, D); } +cmd ::= DROP VIEW exists_opt(A) full_view_name(B). { pCxt->pRootNode = createDropViewStmt(pCxt, A, B); } + +full_view_name(A) ::= view_name(B). { A = createViewNode(pCxt, NULL, &B, NULL); } +full_view_name(A) ::= db_name(B) NK_DOT view_name(C). { A = createViewNode(pCxt, &B, &C, NULL); } + /************************************************ create/drop stream **************************************************/ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G) @@ -750,6 +758,10 @@ column_name(A) ::= NK_ID(B). %destructor function_name { } function_name(A) ::= NK_ID(B). { A = B; } +%type view_name { SToken } +%destructor view_name { } +view_name(A) ::= NK_ID(B). { A = B; } + %type table_alias { SToken } %destructor table_alias { } table_alias(A) ::= NK_ID(B). { A = B; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 2a3235b4f5..dbbb05e9b1 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -219,6 +219,16 @@ static bool checkCGroupName(SAstCreateContext* pCxt, SToken* pCGroup) { return true; } +static bool checkViewName(SAstCreateContext* pCxt, SToken* pViewName) { + trimEscape(pViewName); + if (pViewName->n >= TSDB_VIEW_NAME_LEN) { + pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pViewName->z); + return false; + } + return true; +} + + static bool checkStreamName(SAstCreateContext* pCxt, SToken* pStreamName) { trimEscape(pStreamName); if (pStreamName->n >= TSDB_STREAM_NAME_LEN) { @@ -715,6 +725,23 @@ SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft return (SNode*)joinTable; } +SNode* createViewNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pViewName) { + CHECK_PARSER_STATUS(pCxt); + if (!checkDbName(pCxt, pDbName, true) || !checkViewName(pCxt, pViewName)) { + return NULL; + } + SViewNode* pView = (SViewNode*)nodesMakeNode(QUERY_NODE_VIEW); + CHECK_OUT_OF_MEM(pView); + if (NULL != pDbName) { + COPY_STRING_FORM_ID_TOKEN(pView->table.dbName, pDbName); + } else { + snprintf(pView->table.dbName, sizeof(pView->table.dbName), "%s", pCxt->pQueryCxt->db); + } + COPY_STRING_FORM_ID_TOKEN(pView->table.tableName, pViewName); + return (SNode*)pView; +} + + SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset) { CHECK_PARSER_STATUS(pCxt); SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT); @@ -2126,6 +2153,30 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con return (SNode*)pStmt; } +SNode* createCreateViewStmt(SAstCreateContext* pCxt, bool orReplace, SNode* pView, SNodeList* pCols, SNode* pQuery) { + CHECK_PARSER_STATUS(pCxt); + SCreateViewStmt* pStmt = (SCreateViewStmt*)nodesMakeNode(QUERY_NODE_CREATE_VIEW_STMT); + CHECK_OUT_OF_MEM(pStmt); + strcpy(pStmt->dbName, ((SViewNode*)pView)->table.dbName); + strcpy(pStmt->viewName, ((SViewNode*)pView)->table.tableName); + nodesDestroyNode(pView); + pStmt->orReplace = orReplace; + pStmt->pQuery = pQuery; + pStmt->pCols = pCols; + return (SNode*)pStmt; +} + +SNode* createDropViewStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pView) { + CHECK_PARSER_STATUS(pCxt); + SDropViewStmt* pStmt = (SDropViewStmt*)nodesMakeNode(QUERY_NODE_DROP_VIEW_STMT); + CHECK_OUT_OF_MEM(pStmt); + pStmt->ignoreNotExists = ignoreNotExists; + strcpy(pStmt->dbName, ((SViewNode*)pView)->table.dbName); + strcpy(pStmt->viewName, ((SViewNode*)pView)->table.tableName); + nodesDestroyNode(pView); + return (SNode*)pStmt; +} + SNode* createStreamOptions(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); SStreamOptions* pOptions = (SStreamOptions*)nodesMakeNode(QUERY_NODE_STREAM_OPTIONS); diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 784281ebae..c6cd9c7d12 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -646,6 +646,32 @@ static int32_t collectMetaKeyFromGrant(SCollectMetaKeyCxt* pCxt, SGrantStmt* pSt return reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->objName, pStmt->tabName, pCxt->pMetaCache); } +static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreateViewStmt* pStmt) { + int32_t code = + reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery); + } + if (TSDB_CODE_SUCCESS == code) { + code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE, + pCxt->pMetaCache); + } + + return code; +} + +static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) { + int32_t code = + reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE, + pCxt->pMetaCache); + } + + return code; +} + + static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { @@ -754,6 +780,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt); case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT: return collectMetaKeyFromShowSubscriptions(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_CREATE_VIEW_STMT: + return collectMetaKeyFromCreateViewStmt(pCxt, (SCreateViewStmt*)pStmt); + case QUERY_NODE_CREATE_VIEW_STMT: + return collectMetaKeyFromDropViewStmt(pCxt, (SDropViewStmt*)pStmt); default: break; } diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index 9b2ac662c8..9fbd08c46b 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -35,7 +35,7 @@ typedef struct SAuthRewriteCxt { static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt); static void setUserAuthInfo(SParseContext* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, - SUserAuthInfo* pAuth) { + bool isView, SUserAuthInfo* pAuth) { snprintf(pAuth->user, sizeof(pAuth->user), "%s", pCxt->pUser); if (NULL == pTabName) { tNameSetDbName(&pAuth->tbName, pCxt->acctId, pDbName, strlen(pDbName)); @@ -43,16 +43,17 @@ static void setUserAuthInfo(SParseContext* pCxt, const char* pDbName, const char toName(pCxt->acctId, pDbName, pTabName, &pAuth->tbName); } pAuth->type = type; + pAuth->isView = isView; } -static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond) { +static int32_t checkAuthImpl(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond, bool isView) { SParseContext* pParseCxt = pCxt->pParseCxt; if (pParseCxt->isSuperUser) { return TSDB_CODE_SUCCESS; } SUserAuthInfo authInfo = {0}; - setUserAuthInfo(pCxt->pParseCxt, pDbName, pTabName, type, &authInfo); + setUserAuthInfo(pCxt->pParseCxt, pDbName, pTabName, type, isView, &authInfo); int32_t code = TSDB_CODE_SUCCESS; SUserAuthRes authRes = {0}; if (NULL != pCxt->pMetaCache) { @@ -70,6 +71,16 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabNa return TSDB_CODE_SUCCESS == code ? (authRes.pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code; } + +static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond) { + return checkAuthImpl(pCxt, pDbName, pTabName, type, pCond, false); +} + +static int32_t checkViewAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond) { + return checkAuthImpl(pCxt, pDbName, pTabName, type, pCond, true); +} + + static EDealRes authSubquery(SAuthCxt* pCxt, SNode* pStmt) { return TSDB_CODE_SUCCESS == authQuery(pCxt, pStmt) ? DEAL_RES_CONTINUE : DEAL_RES_ERROR; } @@ -238,6 +249,14 @@ static int32_t authAlterTable(SAuthCxt* pCxt, SAlterTableStmt* pStmt) { return checkAuth(pCxt, pStmt->dbName, pStmt->tableName, AUTH_TYPE_WRITE, NULL); } +static int32_t authCreateView(SAuthCxt* pCxt, SCreateViewStmt* pStmt) { + return checkAuth(pCxt, pStmt->dbName, NULL, AUTH_TYPE_WRITE, NULL); +} + +static int32_t authDropView(SAuthCxt* pCxt, SDropViewStmt* pStmt) { + return checkViewAuth(pCxt, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE, NULL); +} + static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { switch (nodeType(pStmt)) { case QUERY_NODE_SET_OPERATOR: @@ -283,6 +302,10 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { case QUERY_NODE_SHOW_CREATE_TABLE_STMT: case QUERY_NODE_SHOW_CREATE_STABLE_STMT: return authShowCreateTable(pCxt, (SShowCreateTableStmt*)pStmt); + case QUERY_NODE_CREATE_VIEW_STMT: + return authCreateView(pCxt, (SCreateViewStmt*)pStmt); + case QUERY_NODE_DROP_VIEW_STMT: + return authDropView(pCxt, (SDropViewStmt*)pStmt); default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3a6829da56..3c3ed31a1f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7262,6 +7262,42 @@ static int32_t translateResumeStream(STranslateContext* pCxt, SResumeStreamStmt* return buildCmdMsg(pCxt, TDMT_MND_RESUME_STREAM, (FSerializeFunc)tSerializeSMResumeStreamReq, &req); } +static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) { + if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type"); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) { + int32_t code = validateCreateView(pCxt, pStmt); + if (TSDB_CODE_SUCCESS == code) { + code = (*pCxt->pParseCxt->validateSqlFp)(pCxt->pParseCxt->validateSqlParam, pStmt->pQuery, pCxt->pParseCxt->pSql, &pStmt->createReq); + } + if (TSDB_CODE_SUCCESS == code) { + strncpy(pStmt->createReq.name, pStmt->viewName, sizeof(pStmt->createReq.name) - 1); + snprintf(pStmt->createReq.dbFName, sizeof(pStmt->createReq.dbFName) - 1, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName); + pStmt->createReq.orReplace = pStmt->orReplace; + + code = buildCmdMsg(pCxt, TDMT_MND_CREATE_VIEW, (FSerializeFunc)tSerializeSCMCreateViewReq, &pStmt->createReq); + } + + tFreeSCMCreateViewReq(&pStmt->createReq); + return code; +} + + +static int32_t translateDropView(STranslateContext* pCxt, SDropViewStmt* pStmt) { + SMDropViewReq dropReq = {0}; + SName name; + tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); + tNameGetFullDbName(&name, dropReq.dbFName); + dropReq.igNotExists = pStmt->ignoreNotExists; + return buildCmdMsg(pCxt, TDMT_MND_DROP_VIEW, (FSerializeFunc)tSerializeSMDropViewReq, &dropReq); +} + + static int32_t readFromFile(char* pName, int32_t* len, char** buf) { int64_t filesize = 0; if (taosStatFile(pName, &filesize, NULL, NULL) < 0) { @@ -7693,6 +7729,13 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_RESTORE_VNODE_STMT: code = translateRestoreDnode(pCxt, (SRestoreComponentNodeStmt*)pNode); break; + case QUERY_NODE_CREATE_VIEW_STMT: + code = translateCreateView(pCxt, (SCreateStreamStmt*)pNode); + break; + case QUERY_NODE_DROP_VIEW_STMT: + code = translateDropView(pCxt, (SCreateStreamStmt*)pNode); + break; + default: break; } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 1b8bac4cbc..dec4bd2e51 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -519,9 +519,9 @@ int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName) } static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type, - char* pStr) { - return sprintf(pStr, "%s*%d*%s*%s*%d", pUser, acctId, pDb, (NULL == pTable || '\0' == pTable[0]) ? "``" : pTable, - type); + char* pStr, bool isView) { + return sprintf(pStr, "%s*%d*%s*%s*%d*%d", pUser, acctId, pDb, (NULL == pTable || '\0' == pTable[0]) ? "``" : pTable, + type, isView); } static int32_t getIntegerFromAuthStr(const char* pStart, char** pNext) { @@ -691,6 +691,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog if (TSDB_CODE_SUCCESS == code) { code = buildTableReq(pMetaCache->pTableCfg, &pCatalogReq->pTableCfg); } + if (TSDB_CODE_SUCCESS == code) { + code = buildTableReqFromDb(pMetaCache->pViews, &pCatalogReq->pView); + } pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired; return code; } @@ -781,7 +784,7 @@ static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUse SUserAuthInfo* pUser = taosArrayGet(pUserAuthReq, i); char key[USER_AUTH_KEY_MAX_LEN] = {0}; int32_t len = userAuthToString(pUser->tbName.acctId, pUser->user, pUser->tbName.dbname, pUser->tbName.tname, - pUser->type, key); + pUser->type, key, pUser->isView); if (TSDB_CODE_SUCCESS != putMetaDataToHash(key, len, pUserAuthData, i, pUserAuth)) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -826,6 +829,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet if (TSDB_CODE_SUCCESS == code) { code = putTableDataToCache(pCatalogReq->pTableCfg, pMetaData->pTableCfg, &pMetaCache->pTableCfg); } + if (TSDB_CODE_SUCCESS == code) { + code = putDbTableDataToCache(pCatalogReq->pView, pMetaData->pView, &pMetaCache->pViews); + } pMetaCache->pDnodes = pMetaData->pDnodeList; return code; } @@ -880,6 +886,14 @@ int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCac return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta); } +int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { + return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pViews); +} + +int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) { + return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pViews); +} + int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); @@ -985,14 +999,23 @@ static int32_t reserveUserAuthInCacheImpl(const char* pKey, int32_t len, SParseM int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type, SParseMetaCache* pMetaCache) { char key[USER_AUTH_KEY_MAX_LEN] = {0}; - int32_t len = userAuthToString(acctId, pUser, pDb, pTable, type, key); + int32_t len = userAuthToString(acctId, pUser, pDb, pTable, type, key, false); return reserveUserAuthInCacheImpl(key, len, pMetaCache); } +int32_t reserveViewUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type, + SParseMetaCache* pMetaCache) { + char key[USER_AUTH_KEY_MAX_LEN] = {0}; + int32_t len = userAuthToString(acctId, pUser, pDb, pTable, type, key, true); + return reserveUserAuthInCacheImpl(key, len, pMetaCache); +} + + + int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, SUserAuthInfo* pAuthReq, SUserAuthRes* pAuthRes) { char key[USER_AUTH_KEY_MAX_LEN] = {0}; int32_t len = userAuthToString(pAuthReq->tbName.acctId, pAuthReq->user, pAuthReq->tbName.dbname, - pAuthReq->tbName.tname, pAuthReq->type, key); + pAuthReq->tbName.tname, pAuthReq->type, key, pAuthReq->isView); SUserAuthRes* pAuth = NULL; int32_t code = getMetaDataFromHash(key, len, pMetaCache->pUserAuth, (void**)&pAuth); if (TSDB_CODE_SUCCESS == code) { @@ -1135,9 +1158,11 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { if (request) { destoryParseTablesMetaReqHash(pMetaCache->pTableMeta); destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup); + destoryParseTablesMetaReqHash(pMetaCache->pViews); } else { taosHashCleanup(pMetaCache->pTableMeta); taosHashCleanup(pMetaCache->pTableVgroup); + taosHashCleanup(pMetaCache->pViews); } taosHashCleanup(pMetaCache->pDbVgroup); taosHashCleanup(pMetaCache->pDbCfg); diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 856fdb4804..92b2f510b6 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -454,6 +454,130 @@ TEST_F(ParserInitialCTest, createFunction) { run("CREATE OR REPLACE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS 'udf' OUTPUTTYPE DOUBLE BUFSIZE 8 LANGUAGE 'python'"); } +/* + * CREATE [ OR REPLACE ] VIEW name [ ( column_name [, ...] ) ] AS query + * + */ +TEST_F(ParserInitialCTest, createView) { + useDb("root", "test"); + + SCMCreateStreamReq expect = {0}; + + auto clearCreateStreamReq = [&]() { + tFreeSCMCreateStreamReq(&expect); + memset(&expect, 0, sizeof(SCMCreateStreamReq)); + }; + + auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb, + int8_t igExists = 0) { + snprintf(expect.name, sizeof(expect.name), "0.%s", pStream); + snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb); + snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb); + expect.igExists = igExists; + expect.sql = taosStrdup(pSql); + }; + + auto setStreamOptions = + [&](int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t triggerType = STREAM_TRIGGER_WINDOW_CLOSE, + int64_t maxDelay = 0, int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED, + int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY, int8_t igUpdate = STREAM_DEFAULT_IGNORE_UPDATE) { + expect.createStb = createStb; + expect.triggerType = triggerType; + expect.maxDelay = maxDelay; + expect.watermark = watermark; + expect.fillHistory = fillHistory; + expect.igExpired = igExpired; + expect.igUpdate = igUpdate; + }; + + auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) { + SField field = {0}; + strcpy(field.name, pFieldName); + field.type = type; + field.bytes = bytes > 0 ? bytes : tDataTypes[type].bytes; + field.flags |= COL_SMA_ON; + + if (NULL == expect.pTags) { + expect.pTags = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField)); + } + taosArrayPush(expect.pTags, &field); + expect.numOfTags += 1; + }; + + setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { + ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_STREAM_STMT); + SCMCreateStreamReq req = {0}; + ASSERT_TRUE(TSDB_CODE_SUCCESS == + tDeserializeSCMCreateStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); + + ASSERT_EQ(std::string(req.name), std::string(expect.name)); + ASSERT_EQ(std::string(req.sourceDB), std::string(expect.sourceDB)); + ASSERT_EQ(std::string(req.targetStbFullName), std::string(expect.targetStbFullName)); + ASSERT_EQ(req.igExists, expect.igExists); + ASSERT_EQ(std::string(req.sql), std::string(expect.sql)); + ASSERT_EQ(req.triggerType, expect.triggerType); + ASSERT_EQ(req.maxDelay, expect.maxDelay); + ASSERT_EQ(req.watermark, expect.watermark); + ASSERT_EQ(req.fillHistory, expect.fillHistory); + ASSERT_EQ(req.igExpired, expect.igExpired); + ASSERT_EQ(req.numOfTags, expect.numOfTags); + if (expect.numOfTags > 0) { + ASSERT_EQ(taosArrayGetSize(req.pTags), expect.numOfTags); + ASSERT_EQ(taosArrayGetSize(req.pTags), taosArrayGetSize(expect.pTags)); + for (int32_t i = 0; i < expect.numOfTags; ++i) { + SField* pField = (SField*)taosArrayGet(req.pTags, i); + SField* pExpectField = (SField*)taosArrayGet(expect.pTags, i); + ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name)); + ASSERT_EQ(pField->type, pExpectField->type); + ASSERT_EQ(pField->bytes, pExpectField->bytes); + ASSERT_EQ(pField->flags, pExpectField->flags); + } + } + ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq); + ASSERT_EQ(req.createStb, expect.createStb); + ASSERT_EQ(req.igUpdate, expect.igUpdate); + tFreeSCMCreateStreamReq(&req); + }); + + setCreateStreamReq("s1", "test", "create stream s1 into st3 as select count(*) from t1 interval(10s)", "st3"); + setStreamOptions(); + run("CREATE STREAM s1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); + clearCreateStreamReq(); + + setCreateStreamReq( + "s1", "test", + "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 0 ignore " + "update 1 into st3 as select count(*) from t1 interval(10s)", + "st3", 1); + setStreamOptions(STREAM_CREATE_STABLE_TRUE, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, + 10 * MILLISECOND_PER_SECOND, 0, 0, 1); + run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 0 IGNORE " + "UPDATE 1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); + clearCreateStreamReq(); + + setCreateStreamReq("s1", "test", + "create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as " + "select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)", + "st3"); + addTag("tname", TSDB_DATA_TYPE_VARCHAR, 10 + VARSTR_HEADER_SIZE); + addTag("id", TSDB_DATA_TYPE_INT); + setStreamOptions(); + run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " + "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)"); + clearCreateStreamReq(); + + // st1 already exists + setCreateStreamReq( + "s1", "test", + "create stream s1 into st1 tags(tag2) as select max(c1), c2 from t1 partition by tbname tag2 interval(10s)", + "st1"); + setStreamOptions(STREAM_CREATE_STABLE_FALSE); + run("CREATE STREAM s1 INTO st1 TAGS(tag2) AS SELECT MAX(c1), c2 FROM t1 PARTITION BY TBNAME tag2 INTERVAL(10S)"); + clearCreateStreamReq(); +} + + + /* * CREATE MNODE ON DNODE dnode_id */ diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0fff280c59..baa1b8497d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -575,6 +575,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not al TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalidate varbinary value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_IP_RANGE, "Invalid IPV4 address ranges") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") //planner TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")