feat: support create/drop view

This commit is contained in:
dapan1121 2023-09-21 19:28:07 +08:00
parent cc08a5ee87
commit a67b0c99fc
25 changed files with 165 additions and 44 deletions

View File

@ -3903,6 +3903,7 @@ typedef struct {
} SPackedData; } SPackedData;
typedef struct { typedef struct {
char fullname[TSDB_VIEW_FNAME_LEN];
char name[TSDB_VIEW_NAME_LEN]; char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char* querySql; char* querySql;
@ -3914,17 +3915,14 @@ typedef struct {
SRWLatch lock; SRWLatch lock;
} SCMCreateViewReq; } SCMCreateViewReq;
typedef struct {
int64_t streamId;
} SCMCreateViewRsp;
int32_t tSerializeSCMCreateViewReq(void* buf, int32_t bufLen, const SCMCreateViewReq* pReq); int32_t tSerializeSCMCreateViewReq(void* buf, int32_t bufLen, const SCMCreateViewReq* pReq);
int32_t tDeserializeSCMCreateViewReq(void* buf, int32_t bufLen, SCMCreateViewReq* pReq); int32_t tDeserializeSCMCreateViewReq(void* buf, int32_t bufLen, SCMCreateViewReq* pReq);
void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq); void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq);
typedef struct { typedef struct {
char fullname[TSDB_VIEW_FNAME_LEN];
char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN];
char* sql; char* sql;
int8_t igNotExists; int8_t igNotExists;
} SCMDropViewReq; } SCMDropViewReq;

View File

@ -189,7 +189,8 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VIEW, "create-view", SCMCreateViewReq, SCMCreateViewRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VIEW, "create-view", SCMCreateViewReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_VIEW, "drop-view", SCMDropViewReq, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)

View File

@ -359,6 +359,7 @@
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602 #define TK_NK_ILLEGAL 602

View File

@ -496,6 +496,7 @@ typedef struct SCreateViewStmt {
ENodeType type; ENodeType type;
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN]; char viewName[TSDB_VIEW_NAME_LEN];
char* pQuerySql;
bool orReplace; bool orReplace;
SNodeList* pCols; SNodeList* pCols;
SNode* pQuery; SNode* pQuery;

View File

@ -33,7 +33,7 @@ typedef struct SStmtCallback {
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
} SStmtCallback; } SStmtCallback;
typedef int32_t (*validateSqlFn)(void* param, SQuery* pQuery, const char* sql, SQueryResInfo* pRes); typedef int32_t (*validateSqlFn)(void* param, SQuery* pQuery, const char* sql, SCMCreateViewReq* pRes);
typedef struct SParseCsvCxt { typedef struct SParseCsvCxt {
TdFilePtr fp; // last parsed file TdFilePtr fp; // last parsed file

View File

@ -300,6 +300,7 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid); int64_t reqid);
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param); void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
int32_t clientValidateSql(void* param, SQuery* pQuery, const char* sql, SCMCreateViewReq* pReq);
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols); int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);

View File

@ -2592,6 +2592,12 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
schedulerFetchRows(pRequest->body.queryJob, &req); schedulerFetchRows(pRequest->body.queryJob, &req);
} }
int32_t asyncValidateSql(void* param) {
doAsyncQuery((SRequestObj*)param, false);
return TSDB_CODE_SUCCESS;
}
int32_t clientValidateSql(void* param, SQuery* pQuery, const char* sql, SCMCreateViewReq* pReq) { int32_t clientValidateSql(void* param, SQuery* pQuery, const char* sql, SCMCreateViewReq* pReq) {
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
SSyncQueryParam* syncParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam* syncParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
@ -2605,9 +2611,14 @@ int32_t clientValidateSql(void* param, SQuery* pQuery, const char* sql, SCMCreat
return code; return code;
} }
pNewRequest->pQuery = pQuery; //pNewRequest->pQuery = pQuery;
pNewRequest->body.queryFp = syncQueryFn; pNewRequest->body.queryFp = syncQueryFn;
doAsyncQuery(pNewRequest, false);
code = taosAsyncExec(asyncValidateSql, pNewRequest, NULL);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to sched async validate sql");
return code;
}
tsem_wait(&syncParam->sem); tsem_wait(&syncParam->sem);

View File

@ -32,7 +32,7 @@
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE; static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt); static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper);
int taos_options(TSDB_OPTION option, const void *arg, ...) { int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0; static int32_t lock = 0;

View File

@ -8431,8 +8431,10 @@ int32_t tSerializeSCMCreateViewReq(void *buf, int32_t bufLen, const SCMCreateVie
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1; if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->querySql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1; if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1;
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
@ -8454,8 +8456,10 @@ int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->querySql) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1; if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
@ -8485,12 +8489,52 @@ void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq) {
return; return;
} }
taosMemoryFree(pReq->querySql);
taosMemoryFree(pReq->sql);
taosMemoryFree(pReq->pSchema); taosMemoryFree(pReq->pSchema);
} }
int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq* pReq); int32_t tSerializeSCMDropViewReq(void* buf, int32_t bufLen, const SCMDropViewReq* pReq) {
int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pReq); SEncoder encoder = {0};
void tFreeSCMDropViewReq(SCMDropViewReq* pReq); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCMDropViewReq(void* buf, int32_t bufLen, SCMDropViewReq* pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCMDropViewReq(SCMDropViewReq* pReq) {
if (NULL == pReq) {
return;
}
taosMemoryFree(pReq->sql);
}

View File

@ -186,6 +186,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;

View File

@ -7,6 +7,7 @@ IF (TD_ENTERPRISE)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c) LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndVgroup.c) LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndVgroup.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDnode.c) LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDnode.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/view/src/mndView.c)
ENDIF () ENDIF ()
add_library(mnode STATIC ${MNODE_SRC}) add_library(mnode STATIC ${MNODE_SRC})

View File

@ -25,11 +25,28 @@ extern "C" {
int32_t mndInitView(SMnode *pMnode); int32_t mndInitView(SMnode *pMnode);
void mndCleanupView(SMnode *pMnode); void mndCleanupView(SMnode *pMnode);
int32_t mndProcessCreateViewReq(SRpcMsg *pReq);
int32_t mndProcessDropViewReq(SRpcMsg *pReq);
#ifdef TD_ENTERPRISE
SViewObj *mndAcquireView(SMnode *pMnode, char *viewName); SViewObj *mndAcquireView(SMnode *pMnode, char *viewName);
void mndReleaseView(SMnode *pMnode, SViewObj *pView); void mndReleaseView(SMnode *pMnode, SViewObj *pView);
SSdbRaw *mndViewActionEncode(SViewObj *pView); SSdbRaw *mndViewActionEncode(SViewObj *pView);
SSdbRow *mndViewActionDecode(SSdbRaw *pRaw); SSdbRow *mndViewActionDecode(SSdbRaw *pRaw);
int32_t mndViewActionInsert(SSdb *pSdb, SViewObj *pView);
int32_t mndViewActionDelete(SSdb *pSdb, SViewObj *pView);
int32_t mndViewActionUpdate(SSdb *pSdb, SViewObj *pOldView, SViewObj *pNewView);
int32_t mndProcessCreateViewReqImpl(SCMCreateViewReq* pCreateView, SRpcMsg *pReq);
int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq);
#endif
int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
void mndCancelGetNextView(SMnode *pMnode, void *pIter);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -41,6 +41,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "mndView.h"
static inline int32_t mndAcquireRpc(SMnode *pMnode) { static inline int32_t mndAcquireRpc(SMnode *pMnode) {
int32_t code = 0; int32_t code = 0;
@ -447,6 +448,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1; if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;

View File

@ -14,11 +14,11 @@
*/ */
#include "mndView.h" #include "mndView.h"
#include "mndShow.h"
int32_t mndInitView(SMnode *pMnode) { int32_t mndInitView(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_VIEW, mndProcessCreateViewReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_VIEW, mndProcessCreateViewReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_VIEW, mndProcessDropViewReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_VIEW, mndProcessDropViewReq);
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveView); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveView);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView);
@ -41,13 +41,10 @@ int32_t mndInitView(SMnode *pMnode) {
} }
void mndCleanupView(SMnode *pMnode) { void mndCleanupView(SMnode *pMnode) {
taosArrayDestroy(execNodeList.pTaskList);
taosHashCleanup(execNodeList.pTaskMap);
taosThreadMutexDestroy(&execNodeList.lock);
mDebug("mnd view cleanup"); mDebug("mnd view cleanup");
} }
static int32_t mndProcessCreateViewReq(SRpcMsg *pReq) { int32_t mndProcessCreateViewReq(SRpcMsg *pReq) {
#ifndef TD_ENTERPRISE #ifndef TD_ENTERPRISE
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
#else #else
@ -57,13 +54,13 @@ static int32_t mndProcessCreateViewReq(SRpcMsg *pReq) {
return -1; return -1;
} }
mInfo("start to create view:%s, sql:%s", createViewReq.name, createViewReq.sql); mInfo("start to create view:%s, sql:%s", createViewReq.fullname, createViewReq.sql);
return mndProcessCreateViewReqImpl(&createViewReq, pReq); return mndProcessCreateViewReqImpl(&createViewReq, pReq);
#endif #endif
} }
static int32_t mndProcessDropViewReq(SRpcMsg *pReq) { int32_t mndProcessDropViewReq(SRpcMsg *pReq) {
#ifndef TD_ENTERPRISE #ifndef TD_ENTERPRISE
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
#else #else
@ -73,13 +70,14 @@ static int32_t mndProcessDropViewReq(SRpcMsg *pReq) {
return -1; return -1;
} }
mInfo("start to drop view:%s, sql:%s", dropViewReq.viewName, dropViewReq.sql); mInfo("start to drop view:%s, sql:%s", dropViewReq.name, dropViewReq.sql);
return mndProcessDropViewReqImpl(&dropViewReq, pReq); return mndProcessDropViewReqImpl(&dropViewReq, pReq);
#endif #endif
} }
static int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
#if 0
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
@ -149,9 +147,12 @@ static int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
return numOfRows; return numOfRows;
#else
return 0;
#endif
} }
static void mndCancelGetNextView(SMnode *pMnode, void *pIter) { void mndCancelGetNextView(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }

View File

@ -62,6 +62,8 @@ const char *sdbTableName(ESdbType type) {
return "func"; return "func";
case SDB_IDX: case SDB_IDX:
return "idx"; return "idx";
case SDB_VIEW:
return "view";
default: default:
return "undefine"; return "undefine";
} }

View File

@ -248,6 +248,8 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, STokenPair*
SNode* pTagCond); SNode* pTagCond);
SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere); SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere);
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery); SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery);
SNode* createCreateViewStmt(SAstCreateContext* pCxt, bool orReplace, SNode* pView, SNodeList* pCols, const SToken* pAs, SNode* pQuery);
SNode* createDropViewStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pView);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -36,6 +36,7 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray* pPlaceholderValues);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -103,6 +103,8 @@ int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCac
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache); int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type, int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
SParseMetaCache* pMetaCache); SParseMetaCache* pMetaCache);
int32_t reserveViewUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache); int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableCfgInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t reserveTableCfgInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);

View File

@ -627,8 +627,8 @@ cmd ::= CREATE or_replace_opt(A) VIEW full_view_name(B) col_list_opt(C) AS(D) qu
{ pCxt->pRootNode = createCreateViewStmt(pCxt, A, B, C, &D, E); } { pCxt->pRootNode = createCreateViewStmt(pCxt, A, B, C, &D, E); }
cmd ::= DROP VIEW exists_opt(A) full_view_name(B). { pCxt->pRootNode = createDropViewStmt(pCxt, A, B); } cmd ::= DROP VIEW exists_opt(A) full_view_name(B). { pCxt->pRootNode = createDropViewStmt(pCxt, A, B); }
full_view_name(A) ::= view_name(B). { A = createViewNode(pCxt, NULL, &B, NULL); } full_view_name(A) ::= view_name(B). { A = createViewNode(pCxt, NULL, &B); }
full_view_name(A) ::= db_name(B) NK_DOT view_name(C). { A = createViewNode(pCxt, &B, &C, NULL); } full_view_name(A) ::= db_name(B) NK_DOT view_name(C). { A = createViewNode(pCxt, &B, &C); }
/************************************************ create/drop stream **************************************************/ /************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO

View File

@ -2166,6 +2166,8 @@ SNode* createCreateViewStmt(SAstCreateContext* pCxt, bool orReplace, SNode* pVie
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
SCreateViewStmt* pStmt = (SCreateViewStmt*)nodesMakeNode(QUERY_NODE_CREATE_VIEW_STMT); SCreateViewStmt* pStmt = (SCreateViewStmt*)nodesMakeNode(QUERY_NODE_CREATE_VIEW_STMT);
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
pStmt->pQuerySql = strdup(pAs->z + pAs->n);
CHECK_OUT_OF_MEM(pStmt->pQuerySql);
strcpy(pStmt->dbName, ((SViewNode*)pView)->table.dbName); strcpy(pStmt->dbName, ((SViewNode*)pView)->table.dbName);
strcpy(pStmt->viewName, ((SViewNode*)pView)->table.tableName); strcpy(pStmt->viewName, ((SViewNode*)pView)->table.tableName);
nodesDestroyNode(pView); nodesDestroyNode(pView);

View File

@ -29,6 +29,20 @@ extern void Parse(void*, int, SToken, void*);
extern void ParseFree(void*, FFree); extern void ParseFree(void*, FFree);
extern void ParseTrace(FILE*, char*); extern void ParseTrace(FILE*, char*);
int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray* pPlaceholderValues) {
*pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->pRoot = pRootNode;
(*pQuery)->placeholderNum = placeholderNo;
(*pQuery)->pPlaceholderValues = pPlaceholderValues;
(*pQuery)->execStage = QUERY_EXEC_STAGE_ANALYSE;
return TSDB_CODE_SUCCESS;
}
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) { int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
SAstCreateContext cxt; SAstCreateContext cxt;
initAstCreateContext(pParseCxt, &cxt); initAstCreateContext(pParseCxt, &cxt);
@ -77,14 +91,10 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
abort_parse: abort_parse:
ParseFree(pParser, (FFree)taosMemoryFree); ParseFree(pParser, (FFree)taosMemoryFree);
if (TSDB_CODE_SUCCESS == cxt.errCode) { if (TSDB_CODE_SUCCESS == cxt.errCode) {
*pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); int32_t code = buildQueryAfterParse(pQuery, cxt.pRootNode, cxt.placeholderNo, cxt.pPlaceholderValues);
if (NULL == *pQuery) { if (TSDB_CODE_SUCCESS != code) {
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
(*pQuery)->pRoot = cxt.pRootNode;
(*pQuery)->placeholderNum = cxt.placeholderNo;
TSWAP((*pQuery)->pPlaceholderValues, cxt.pPlaceholderValues);
(*pQuery)->execStage = QUERY_EXEC_STAGE_ANALYSE;
} }
taosArrayDestroy(cxt.pPlaceholderValues); taosArrayDestroy(cxt.pPlaceholderValues);
return cxt.errCode; return cxt.errCode;
@ -782,7 +792,7 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowSubscriptions(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowSubscriptions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_CREATE_VIEW_STMT: case QUERY_NODE_CREATE_VIEW_STMT:
return collectMetaKeyFromCreateViewStmt(pCxt, (SCreateViewStmt*)pStmt); return collectMetaKeyFromCreateViewStmt(pCxt, (SCreateViewStmt*)pStmt);
case QUERY_NODE_CREATE_VIEW_STMT: case QUERY_NODE_DROP_VIEW_STMT:
return collectMetaKeyFromDropViewStmt(pCxt, (SDropViewStmt*)pStmt); return collectMetaKeyFromDropViewStmt(pCxt, (SDropViewStmt*)pStmt);
default: default:
break; break;

View File

@ -263,6 +263,7 @@ static SKeyword keywordTable[] = {
{"VERBOSE", TK_VERBOSE}, {"VERBOSE", TK_VERBOSE},
{"VGROUP", TK_VGROUP}, {"VGROUP", TK_VGROUP},
{"VGROUPS", TK_VGROUPS}, {"VGROUPS", TK_VGROUPS},
{"VIEW", TK_VIEW},
{"VNODE", TK_VNODE}, {"VNODE", TK_VNODE},
{"VNODES", TK_VNODES}, {"VNODES", TK_VNODES},
{"WAL_FSYNC_PERIOD", TK_WAL_FSYNC_PERIOD}, {"WAL_FSYNC_PERIOD", TK_WAL_FSYNC_PERIOD},

View File

@ -7273,13 +7273,24 @@ static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStm
static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) { static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) {
int32_t code = validateCreateView(pCxt, pStmt); int32_t code = validateCreateView(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = (*pCxt->pParseCxt->validateSqlFp)(pCxt->pParseCxt->validateSqlParam, pStmt->pQuery, pCxt->pParseCxt->pSql, &pStmt->createReq); SQuery* pQuery = NULL;
code = buildQueryAfterParse(&pQuery, pStmt->pQuery, 0, NULL);
if (TSDB_CODE_SUCCESS == code) {
code = (*pCxt->pParseCxt->validateSqlFp)(pCxt->pParseCxt->validateSqlParam, pQuery, pStmt->pQuerySql, &pStmt->createReq);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
strncpy(pStmt->createReq.name, pStmt->viewName, sizeof(pStmt->createReq.name) - 1); strncpy(pStmt->createReq.name, pStmt->viewName, sizeof(pStmt->createReq.name) - 1);
snprintf(pStmt->createReq.dbFName, sizeof(pStmt->createReq.dbFName) - 1, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName); snprintf(pStmt->createReq.dbFName, sizeof(pStmt->createReq.dbFName) - 1, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName);
snprintf(pStmt->createReq.fullname, sizeof(pStmt->createReq.fullname) - 1, "%s.%s", pStmt->createReq.dbFName, pStmt->viewName);
TSWAP(pStmt->createReq.querySql, pStmt->pQuerySql);
pStmt->createReq.orReplace = pStmt->orReplace; pStmt->createReq.orReplace = pStmt->orReplace;
pStmt->createReq.sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pStmt->createReq.sql) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_VIEW, (FSerializeFunc)tSerializeSCMCreateViewReq, &pStmt->createReq); code = buildCmdMsg(pCxt, TDMT_MND_CREATE_VIEW, (FSerializeFunc)tSerializeSCMCreateViewReq, &pStmt->createReq);
} }
@ -7289,12 +7300,18 @@ static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pSt
static int32_t translateDropView(STranslateContext* pCxt, SDropViewStmt* pStmt) { static int32_t translateDropView(STranslateContext* pCxt, SDropViewStmt* pStmt) {
SMDropViewReq dropReq = {0}; SCMDropViewReq dropReq = {0};
SName name; SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, dropReq.dbFName); tNameGetFullDbName(&name, dropReq.dbFName);
strncpy(dropReq.name, pStmt->viewName, sizeof(dropReq.name) - 1);
snprintf(dropReq.fullname, sizeof(dropReq.fullname) - 1, "%s.%s", dropReq.dbFName, dropReq.name);
dropReq.sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == dropReq.sql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
dropReq.igNotExists = pStmt->ignoreNotExists; dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_VIEW, (FSerializeFunc)tSerializeSMDropViewReq, &dropReq); return buildCmdMsg(pCxt, TDMT_MND_DROP_VIEW, (FSerializeFunc)tSerializeSCMDropViewReq, &dropReq);
} }
@ -7730,10 +7747,10 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
code = translateRestoreDnode(pCxt, (SRestoreComponentNodeStmt*)pNode); code = translateRestoreDnode(pCxt, (SRestoreComponentNodeStmt*)pNode);
break; break;
case QUERY_NODE_CREATE_VIEW_STMT: case QUERY_NODE_CREATE_VIEW_STMT:
code = translateCreateView(pCxt, (SCreateStreamStmt*)pNode); code = translateCreateView(pCxt, (SCreateViewStmt*)pNode);
break; break;
case QUERY_NODE_DROP_VIEW_STMT: case QUERY_NODE_DROP_VIEW_STMT:
code = translateDropView(pCxt, (SCreateStreamStmt*)pNode); code = translateDropView(pCxt, (SDropViewStmt*)pNode);
break; break;
default: default:

View File

@ -5676,11 +5676,11 @@ static YYACTIONTYPE yy_reduce(
{ pCxt->pRootNode = createDropViewStmt(pCxt, yymsp[-1].minor.yy953, yymsp[0].minor.yy168); } { pCxt->pRootNode = createDropViewStmt(pCxt, yymsp[-1].minor.yy953, yymsp[0].minor.yy168); }
break; break;
case 345: /* full_view_name ::= view_name */ case 345: /* full_view_name ::= view_name */
{ yylhsminor.yy168 = createViewNode(pCxt, NULL, &yymsp[0].minor.yy737, NULL); } { yylhsminor.yy168 = createViewNode(pCxt, NULL, &yymsp[0].minor.yy737); }
yymsp[0].minor.yy168 = yylhsminor.yy168; yymsp[0].minor.yy168 = yylhsminor.yy168;
break; break;
case 346: /* full_view_name ::= db_name NK_DOT view_name */ case 346: /* full_view_name ::= db_name NK_DOT view_name */
{ yylhsminor.yy168 = createViewNode(pCxt, &yymsp[-2].minor.yy737, &yymsp[0].minor.yy737, NULL); } { yylhsminor.yy168 = createViewNode(pCxt, &yymsp[-2].minor.yy737, &yymsp[0].minor.yy737); }
yymsp[-2].minor.yy168 = yylhsminor.yy168; yymsp[-2].minor.yy168 = yylhsminor.yy168;
break; break;
case 347: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options INTO full_table_name col_list_opt tag_def_or_ref_opt subtable_opt AS query_or_subquery */ case 347: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options INTO full_table_name col_list_opt tag_def_or_ref_opt subtable_opt AS query_or_subquery */

View File

@ -316,6 +316,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option")
// mnode-view
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_ALREADY_EXIST, "view already exists in db")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_NOT_EXIST, "view not exists in db")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NOT_FOUND, "Mnode not found") TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NOT_FOUND, "Mnode not found")