feat: support view

This commit is contained in:
dapan1121 2023-09-19 14:19:54 +08:00
parent 8654012d9a
commit 14e8db289d
28 changed files with 1437 additions and 14 deletions

View File

@ -141,6 +141,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_APPS,
TSDB_MGMT_TABLE_STREAM_TASKS,
TSDB_MGMT_TABLE_PRIVILEGES,
TSDB_MGMT_TABLE_VIEWS,
TSDB_MGMT_TABLE_MAX,
} EShowType;
@ -251,6 +252,7 @@ typedef enum ENodeType {
QUERY_NODE_CASE_WHEN,
QUERY_NODE_EVENT_WINDOW,
QUERY_NODE_HINT,
QUERY_NODE_VIEW,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
@ -354,7 +356,9 @@ typedef enum ENodeType {
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,
@ -3847,6 +3851,32 @@ typedef struct {
};
} SPackedData;
typedef struct {
char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int8_t orReplace;
int8_t precision;
int32_t numOfCols;
SSchema* pSchema;
} SCMCreateViewReq;
typedef struct {
int64_t streamId;
} SCMCreateViewRsp;
int32_t tSerializeSCMCreateViewReq(void* buf, int32_t bufLen, const SCMCreateViewReq* pReq);
int32_t tDeserializeSCMCreateViewReq(void* buf, int32_t bufLen, SCMCreateViewReq* pReq);
void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq);
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN];
int8_t igNotExists;
} SCMDropViewReq;
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -187,6 +187,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VIEW, "create-view", SCMCreateViewReq, SCMCreateViewRsp)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)

View File

@ -51,6 +51,7 @@ typedef enum {
typedef struct SUserAuthInfo {
char user[TSDB_USER_LEN];
SName tbName;
bool isView;
AUTH_TYPE type;
} SUserAuthInfo;
@ -83,6 +84,7 @@ typedef struct SCatalogReq {
SArray* pTableIndex; // element is SNAME
SArray* pTableCfg; // element is SNAME
SArray* pTableTag; // element is SNAME
SArray* pView; // element is STablesReq
bool qNodeRequired; // valid qnode
bool dNodeRequired; // valid dnode
bool svrVerRequired;
@ -109,6 +111,7 @@ typedef struct SMetaData {
SArray* pTableCfg; // pRes = STableCfg*
SArray* pTableTag; // pRes = SArray<STagVal>*
SArray* pDnodeList; // pRes = SArray<SEpSet>*
SArray* pView; // pRes = SViewInfo*
SMetaRes* pSvrVer; // pRes = char*
} SMetaData;

View File

@ -488,6 +488,27 @@ typedef struct SDropFunctionStmt {
bool ignoreNotExists;
} SDropFunctionStmt;
typedef struct SQueryResInfo {
} SQueryResInfo;
typedef struct SCreateViewStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN];
bool orReplace;
SNodeList* pCols;
SNode* pQuery;
SCMCreateViewReq createReq;
} SCreateViewStmt;
typedef struct SDropViewStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN];
bool ignoreNotExists;
} SDropViewStmt;
typedef struct SGrantStmt {
ENodeType type;
char userName[TSDB_USER_LEN];

View File

@ -180,6 +180,16 @@ typedef struct STempTableNode {
SNode* pSubquery;
} STempTableNode;
typedef struct SViewNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES
double ratio;
SArray* pSmaIndexes;
int8_t cacheLastMode;
} SViewNode;
typedef enum EJoinType {
JOIN_TYPE_INNER = 1,
JOIN_TYPE_LEFT,

View File

@ -33,6 +33,8 @@ typedef struct SStmtCallback {
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
} SStmtCallback;
typedef int32_t (*validateSqlFn)(void* param, SQuery* pQuery, const char* sql, SQueryResInfo* pRes);
typedef struct SParseCsvCxt {
TdFilePtr fp; // last parsed file
int32_t tableNo; // last parsed table
@ -64,6 +66,8 @@ typedef struct SParseContext {
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId;
validateSqlFn validateSqlFp;
void* validateSqlParam;
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);

View File

@ -711,6 +711,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2669)
#define TSDB_CODE_PAR_INVALID_VARBINARY TAOS_DEF_ERROR_CODE(0, 0x266A)
#define TSDB_CODE_PAR_INVALID_IP_RANGE TAOS_DEF_ERROR_CODE(0, 0x266B)
#define TSDB_CODE_PAR_INVALID_VIEW_QUERY TAOS_DEF_ERROR_CODE(0, 0x266C)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -230,6 +230,9 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
#define TSDB_VIEW_NAME_LEN 193
#define TSDB_VIEW_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_VIEW_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025

View File

@ -2591,3 +2591,48 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
schedulerFetchRows(pRequest->body.queryJob, &req);
}
int32_t clientValidateSql(void* param, SQuery* pQuery, const char* sql, SCMCreateViewReq* pReq) {
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
SSyncQueryParam* syncParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&syncParam->sem, 0, 0);
SRequestObj* pRequest = pWrapper->pRequest;
SRequestObj* pNewRequest = NULL;
int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), syncParam, true, &pNewRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
pNewRequest->pQuery = pQuery;
pNewRequest->body.queryFp = syncQueryFn;
doAsyncQuery(pNewRequest, false);
tsem_wait(&syncParam->sem);
code = pNewRequest->code;
pRequest->code = code;
if (TSDB_CODE_SUCCESS == code) {
pReq->numOfCols = pQuery->numOfResCols;
pReq->precision = pQuery->precision;
pReq->pSchema = taosMemoryMalloc(pQuery->numOfResCols * sizeof(SSchema));
if (NULL == pReq->pSchema) {
code = terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(pReq->pSchema, pQuery->pResSchema, pQuery->numOfResCols * sizeof(SSchema));
}
} else if (0 != pNewRequest->msgBuf[0]) {
strncpy(pRequest->msgBuf, pNewRequest->msgBuf, pRequest->msgBufLen - 1);
pRequest->msgBuf[pRequest->msgBufLen - 1] = 0;
}
freeQueryParam(syncParam);
destroyRequest(pNewRequest);
return code;
}

View File

@ -1026,7 +1026,7 @@ void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp,
taosAsyncQueryImplWithReqid(connId, sql, fp, param, false, reqid);
}
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper) {
const STscObj *pTscObj = pRequest->pTscObj;
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
@ -1051,7 +1051,9 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
.async = true,
.svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.allocatorId = pRequest->allocatorRefId};
.allocatorId = pRequest->allocatorRefId,
.validateSqlFp = clientValidateSql,
.validateSqlParam = pWrapper};
return TSDB_CODE_SUCCESS;
}
@ -1068,7 +1070,7 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
}
if (TSDB_CODE_SUCCESS == code) {
code = createParseContext(pRequest, &pWrapper->pParseCtx);
code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
}
if (TSDB_CODE_SUCCESS == code) {

View File

@ -8215,3 +8215,65 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) {
pSubTopicEp->schema.nCols = 0;
taosArrayDestroy(pSubTopicEp->vgs);
}
int32_t tSerializeSCMCreateViewReq(void *buf, int32_t bufLen, const SCMCreateViewReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1;
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfCols) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfCols; ++i) {
SSchema *pSchema = &pReq->pSchema[i];
if (tEncodeSSchema(&encoder, pSchema) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfCols) < 0) return -1;
if (pReq->numOfCols > 0) {
pReq->pSchema = taosArrayInit(pReq->numOfCols, sizeof(SSchema));
if (pReq->pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pReq->numOfCols; ++i) {
SSchema* pSchema = pReq->pSchema + i;
if (tDecodeSSchema(&decoder, pSchema) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCMCreateViewReq(SCMCreateViewReq* pReq) {
if (NULL == pReq) {
return;
}
taosMemoryFree(pReq->pSchema);
}

View File

@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;

View File

@ -725,6 +725,62 @@ void tFreeStreamObj(SStreamObj* pObj);
// SArray* childInfo; // SArray<SStreamChildEpInfo>
// } SStreamCheckpointObj;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
// ctl
SRWLatch lock;
// create info
int64_t createTime;
int64_t updateTime;
int32_t version;
int32_t totalLevel;
int64_t smaId; // 0 for unused
// info
int64_t uid;
int8_t status;
SStreamConf conf;
// source and target
int64_t sourceDbUid;
int64_t targetDbUid;
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* pHTasksList; // generate the results for already stored ts data
int64_t hTaskUid; // stream task for history ts data
SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
// 3.0.20
int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize
int64_t deleteMark;
int8_t igCheckUpdate;
// 3.0.5.
int64_t checkpointId;
char reserve[256];
} SViewObj;
int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj);
int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver);
void tFreeSViewObj(SViewObj* pObj);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,48 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_STREAM_H_
#define _TD_MND_STREAM_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_STREAM_H_*/

View File

@ -0,0 +1,797 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndView.h"
int32_t mndInitView(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_VIEW,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndViewActionEncode,
.decodeFp = (SdbDecodeFp)mndViewActionDecode,
.insertFp = (SdbInsertFp)mndViewActionInsert,
.updateFp = (SdbUpdateFp)mndViewActionUpdate,
.deleteFp = (SdbDeleteFp)mndViewActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextStream);
taosThreadMutexInit(&execNodeList.lock, NULL);
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry));
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupView(SMnode *pMnode) {
taosArrayDestroy(execNodeList.pTaskList);
taosHashCleanup(execNodeList.pTaskMap);
taosThreadMutexDestroy(&execNodeList.lock);
mDebug("mnd view cleanup");
}
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SViewObj *pObj) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (pObj->sql != NULL) {
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
}
if (pObj->ast != NULL) {
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
}
if (pObj->physicalPlan != NULL) {
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
}
int32_t sz = taosArrayGetSize(pObj->tasks);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray);
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j);
pTask->ver = SSTREAM_TASK_VER;
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
}
}
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
// 3.0.20 ver =2
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
// 3.0.50 ver = 3
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SViewObj *pObj, int32_t sver) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
taosArrayDestroy(pArray);
return -1;
}
if (tDecodeStreamTask(pDecoder, pTask) < 0) {
taosMemoryFree(pTask);
taosArrayDestroy(pArray);
return -1;
}
taosArrayPush(pArray, &pTask);
}
taosArrayPush(pObj->tasks, &pArray);
}
}
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
// 3.0.20
if (sver >= 2) {
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
}
}
if (sver >= 3) {
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
void tFreeStreamObj(SViewObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
pStream->tasks = freeStreamTasks(pStream->tasks);
pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
if (pStream->tagSchema.nCols > 0) {
taosMemoryFree(pStream->tagSchema.pSchema);
}
}
SSdbRaw *mndViewActionEncode(SViewObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
if (tEncodeSViewObj(&encoder, pStream) < 0) {
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER;
tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
tEncoderClear(&encoder);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
terrno = TSDB_CODE_SUCCESS;
STREAM_ENCODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream);
return pRaw;
}
SSdbRow *mndViewActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SViewObj *pStream = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto STREAM_DECODE_OVER;
}
if (sver != MND_STREAM_VER_NUMBER) {
terrno = 0;
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
goto STREAM_DECODE_OVER;
}
pRow = sdbAllocRow(sizeof(SViewObj));
if (pRow == NULL) goto STREAM_DECODE_OVER;
pStream = sdbGetRowObj(pRow);
if (pStream == NULL) goto STREAM_DECODE_OVER;
int32_t tlen;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
buf = taosMemoryMalloc(tlen + 1);
if (buf == NULL) goto STREAM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) {
tDecoderClear(&decoder);
goto STREAM_DECODE_OVER;
}
tDecoderClear(&decoder);
terrno = TSDB_CODE_SUCCESS;
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw,
terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream);
return pRow;
}
static int32_t mndViewActionInsert(SSdb *pSdb, SViewObj *pView) {
mTrace("view:%s, perform insert action", pView->name);
return 0;
}
static int32_t mndViewActionDelete(SSdb *pSdb, SViewObj *pStream) {
mTrace("stream:%s, perform delete action", pStream->name);
taosWLockLatch(&pStream->lock);
tFreeStreamObj(pStream);
taosWUnLockLatch(&pStream->lock);
return 0;
}
static int32_t mndViewActionUpdate(SSdb *pSdb, SViewObj *pOldStream, SViewObj *pNewStream) {
mTrace("stream:%s, perform update action", pOldStream->name);
atomic_exchange_32(&pOldStream->version, pNewStream->version);
taosWLockLatch(&pOldStream->lock);
pOldStream->status = pNewStream->status;
pOldStream->updateTime = pNewStream->updateTime;
taosWUnLockLatch(&pOldStream->lock);
return 0;
}
SViewObj *mndAcquireView(SMnode *pMnode, char *viewName) {
SSdb *pSdb = pMnode->pSdb;
SViewObj *pStream = sdbAcquire(pSdb, SDB_VIEW, viewName);
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
}
return pStream;
}
void mndReleaseStream(SMnode *pMnode, SViewObj *pStream) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pStream);
}
static int32_t mndBuildViewObj(SMnode *pMnode, SViewObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
SQueryPlan *pPlan = NULL;
mInfo("stream:%s to create", pCreate->name);
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
pObj->createTime = taosGetTimestampMs();
pObj->updateTime = pObj->createTime;
pObj->version = 1;
pObj->smaId = 0;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
pObj->conf.igExpired = pCreate->igExpired;
pObj->conf.trigger = pCreate->triggerType;
pObj->conf.triggerParam = pCreate->maxDelay;
pObj->conf.watermark = pCreate->watermark;
pObj->conf.fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) {
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
return -1;
}
pObj->sourceDbUid = pSourceDb->uid;
mndReleaseDb(pMnode, pSourceDb);
memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) {
mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
return -1;
}
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
} else {
pObj->targetStbUid = pCreate->targetStbUid;
}
pObj->targetDbUid = pTargetDb->uid;
mndReleaseDb(pMnode, pTargetDb);
pObj->sql = pCreate->sql;
pObj->ast = pCreate->ast;
pCreate->sql = NULL;
pCreate->ast = NULL;
// deserialize ast
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
goto FAIL;
}
// extract output schema from ast
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
goto FAIL;
}
int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
if (numOfNULL > 0) {
pObj->outputSchema.nCols += numOfNULL;
SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
if (!pFullSchema) {
goto FAIL;
}
int32_t nullIndex = 0;
int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (nullIndex >= numOfNULL || i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++;
} else {
pFullSchema[i].bytes = 0;
pFullSchema[i].colId = pos->colId;
pFullSchema[i].flags = COL_SET_NULL;
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
pFullSchema[i].type = pos->type;
nullIndex++;
}
}
taosMemoryFree(pObj->outputSchema.pSchema);
pObj->outputSchema.pSchema = pFullSchema;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
};
// using ast and param to build physical plan
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
goto FAIL;
}
// save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
goto FAIL;
}
pObj->tagSchema.nCols = pCreate->numOfTags;
if (pCreate->numOfTags) {
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
}
/*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
for (int32_t i = 0; i < pCreate->numOfTags; i++) {
SField *pField = taosArrayGet(pCreate->pTags, i);
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
pObj->tagSchema.pSchema[i].bytes = pField->bytes;
pObj->tagSchema.pSchema[i].flags = pField->flags;
pObj->tagSchema.pSchema[i].type = pField->type;
memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN);
}
FAIL:
if (pAst != NULL) nodesDestroyNode(pAst);
if (pPlan != NULL) qDestroyQueryPlan(pPlan);
return 0;
}
static int32_t mndCreateView(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SRpcMsg *pReq) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass);
tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superUser = 0; // pCreate->superUser;
userObj.sysInfo = pCreate->sysInfo;
userObj.enable = pCreate->enable;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-user");
if (pTrans == NULL) {
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
return -1;
}
mInfo("trans:%d, used to create user:%s", pTrans->id, pCreate->user);
SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
static int32_t mndProcessCreateViewReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SViewObj *pStream = NULL;
SDbObj *pDb = NULL;
SCMCreateViewReq createViewReq = {0};
SViewObj streamObj = {0};
if (tDeserializeSCMCreateViewReq(pReq->pCont, pReq->contLen, &createViewReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("start to create view:%s, sql:%s", createViewReq.name, createViewReq.sql);
pStream = mndAcquireStream(pMnode, createStreamReq.name);
if (pStream != NULL) {
if (createStreamReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
code = 0;
goto _OVER;
} else {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
}
} else if (terrno != TSDB_CODE_MND_STREAM_NOT_EXIST) {
goto _OVER;
}
if (mndCreateView(pMnode, &streamObj, &createStreamReq) < 0) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
// execute creation
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
mndTransDrop(pTrans);
taosThreadMutexLock(&execNodeList.lock);
mDebug("register to stream task node list");
keepStreamTasksInBuf(&streamObj, &execNodeList);
taosThreadMutexUnlock(&execNodeList.lock);
code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[2000] = {0};
sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark);
SName name = {0};
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "createView", name.dbname, "", detail);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
}
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj);
return code;
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SViewObj *pStream = NULL;
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) {
if (dropReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
// mndTransSetSerial(pTrans);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
char detail[100] = {0};
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SViewObj *pStream = NULL;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
SColumnInfoData *pColInfo;
SName n;
int32_t cols = 0;
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0};
mndShowStreamStatus(status2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
if (pStream->targetSTbName[0] == 0) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, NULL, true);
} else {
char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[20] = {0};
mndShowStreamTrigger(trigger2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
numOfRows++;
sdbRelease(pSdb, pStream);
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}

View File

@ -148,7 +148,8 @@ typedef enum {
SDB_DB = 19,
SDB_FUNC = 20,
SDB_IDX = 21,
SDB_MAX = 22
SDB_VIEW = 22,
SDB_MAX = 23
} ESdbType;
typedef struct SSdbRaw {

View File

@ -269,6 +269,10 @@ const char* nodesNodeName(ENodeType type) {
return "RestoreMnodeStmt";
case QUERY_NODE_RESTORE_VNODE_STMT:
return "RestoreVnodeStmt";
case QUERY_NODE_CREATE_VIEW_STMT:
return "CreateViewStmt";
case QUERY_NODE_DROP_VIEW_STMT:
return "DropViewStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:

View File

@ -304,6 +304,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SEventWindowNode));
case QUERY_NODE_HINT:
return makeNode(type, sizeof(SHintNode));
case QUERY_NODE_VIEW:
return makeNode(type, sizeof(SViewNode));
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
@ -467,6 +469,10 @@ SNode* nodesMakeNode(ENodeType type) {
case QUERY_NODE_RESTORE_MNODE_STMT:
case QUERY_NODE_RESTORE_VNODE_STMT:
return makeNode(type, sizeof(SRestoreComponentNodeStmt));
case QUERY_NODE_CREATE_VIEW_STMT:
return makeNode(type, sizeof(SCreateViewStmt));
case QUERY_NODE_DROP_VIEW_STMT:
return makeNode(type, sizeof(SDropViewStmt));
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN:
@ -832,6 +838,13 @@ void nodesDestroyNode(SNode* pNode) {
destroyHintValue(pHint->option, pHint->value);
break;
}
case QUERY_NODE_VIEW: {
SViewNode* pView = (SViewNode*)pNode;
taosMemoryFreeClear(pView->pMeta);
taosMemoryFreeClear(pView->pVgroupList);
taosArrayDestroyEx(pView->pSmaIndexes, destroySmaIndex);
break;
}
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyList(pStmt->pProjectionList);
@ -1115,6 +1128,13 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_RESTORE_MNODE_STMT: // no pointer field
case QUERY_NODE_RESTORE_VNODE_STMT: // no pointer field
break;
case QUERY_NODE_CREATE_VIEW_STMT: {
SCreateViewStmt* pStmt = (SCreateViewStmt*)pNode;
nodesDestroyNode(pStmt->pQuery);
break;
}
case QUERY_NODE_DROP_VIEW_STMT:
break;
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pLogicNode = (SScanLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);

View File

@ -118,6 +118,7 @@ SNode* createNodeListNodeEx(SAstCreateContext* pCxt, SNode* p1, SNode* p2);
SNode* createRealTableNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pTableName, SToken* pTableAlias);
SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const SToken* pTableAlias);
SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft, SNode* pRight, SNode* pJoinCond);
SNode* createViewNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pViewName);
SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset);
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);

View File

@ -69,6 +69,7 @@ typedef struct SParseMetaCache {
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
SHashObj* pViews; // key is viewFName, element is SViewInfo*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
} SParseMetaCache;
@ -93,6 +94,8 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pView, SParseMetaCache* pMetaCache);
int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);

View File

@ -622,6 +622,14 @@ language_opt(A) ::= LANGUAGE NK_STRING(B).
or_replace_opt(A) ::= . { A = false; }
or_replace_opt(A) ::= OR REPLACE. { A = true; }
/************************************************ create/drop view **************************************************/
cmd ::= CREATE or_replace_opt(A) VIEW full_view_name(B) col_list_opt(C) AS query_or_subquery(D).
{ pCxt->pRootNode = createCreateViewStmt(pCxt, A, B, C, D); }
cmd ::= DROP VIEW exists_opt(A) full_view_name(B). { pCxt->pRootNode = createDropViewStmt(pCxt, A, B); }
full_view_name(A) ::= view_name(B). { A = createViewNode(pCxt, NULL, &B, NULL); }
full_view_name(A) ::= db_name(B) NK_DOT view_name(C). { A = createViewNode(pCxt, &B, &C, NULL); }
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
@ -750,6 +758,10 @@ column_name(A) ::= NK_ID(B).
%destructor function_name { }
function_name(A) ::= NK_ID(B). { A = B; }
%type view_name { SToken }
%destructor view_name { }
view_name(A) ::= NK_ID(B). { A = B; }
%type table_alias { SToken }
%destructor table_alias { }
table_alias(A) ::= NK_ID(B). { A = B; }

View File

@ -219,6 +219,16 @@ static bool checkCGroupName(SAstCreateContext* pCxt, SToken* pCGroup) {
return true;
}
static bool checkViewName(SAstCreateContext* pCxt, SToken* pViewName) {
trimEscape(pViewName);
if (pViewName->n >= TSDB_VIEW_NAME_LEN) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pViewName->z);
return false;
}
return true;
}
static bool checkStreamName(SAstCreateContext* pCxt, SToken* pStreamName) {
trimEscape(pStreamName);
if (pStreamName->n >= TSDB_STREAM_NAME_LEN) {
@ -715,6 +725,23 @@ SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft
return (SNode*)joinTable;
}
SNode* createViewNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pViewName) {
CHECK_PARSER_STATUS(pCxt);
if (!checkDbName(pCxt, pDbName, true) || !checkViewName(pCxt, pViewName)) {
return NULL;
}
SViewNode* pView = (SViewNode*)nodesMakeNode(QUERY_NODE_VIEW);
CHECK_OUT_OF_MEM(pView);
if (NULL != pDbName) {
COPY_STRING_FORM_ID_TOKEN(pView->table.dbName, pDbName);
} else {
snprintf(pView->table.dbName, sizeof(pView->table.dbName), "%s", pCxt->pQueryCxt->db);
}
COPY_STRING_FORM_ID_TOKEN(pView->table.tableName, pViewName);
return (SNode*)pView;
}
SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset) {
CHECK_PARSER_STATUS(pCxt);
SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
@ -2126,6 +2153,30 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con
return (SNode*)pStmt;
}
SNode* createCreateViewStmt(SAstCreateContext* pCxt, bool orReplace, SNode* pView, SNodeList* pCols, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
SCreateViewStmt* pStmt = (SCreateViewStmt*)nodesMakeNode(QUERY_NODE_CREATE_VIEW_STMT);
CHECK_OUT_OF_MEM(pStmt);
strcpy(pStmt->dbName, ((SViewNode*)pView)->table.dbName);
strcpy(pStmt->viewName, ((SViewNode*)pView)->table.tableName);
nodesDestroyNode(pView);
pStmt->orReplace = orReplace;
pStmt->pQuery = pQuery;
pStmt->pCols = pCols;
return (SNode*)pStmt;
}
SNode* createDropViewStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pView) {
CHECK_PARSER_STATUS(pCxt);
SDropViewStmt* pStmt = (SDropViewStmt*)nodesMakeNode(QUERY_NODE_DROP_VIEW_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreNotExists = ignoreNotExists;
strcpy(pStmt->dbName, ((SViewNode*)pView)->table.dbName);
strcpy(pStmt->viewName, ((SViewNode*)pView)->table.tableName);
nodesDestroyNode(pView);
return (SNode*)pStmt;
}
SNode* createStreamOptions(SAstCreateContext* pCxt) {
CHECK_PARSER_STATUS(pCxt);
SStreamOptions* pOptions = (SStreamOptions*)nodesMakeNode(QUERY_NODE_STREAM_OPTIONS);

View File

@ -646,6 +646,32 @@ static int32_t collectMetaKeyFromGrant(SCollectMetaKeyCxt* pCxt, SGrantStmt* pSt
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->objName, pStmt->tabName, pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreateViewStmt* pStmt) {
int32_t code =
reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE,
pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
int32_t code =
reserveViewMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE,
pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
pCxt->pStmt = pStmt;
switch (nodeType(pStmt)) {
@ -754,6 +780,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
return collectMetaKeyFromShowSubscriptions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_CREATE_VIEW_STMT:
return collectMetaKeyFromCreateViewStmt(pCxt, (SCreateViewStmt*)pStmt);
case QUERY_NODE_CREATE_VIEW_STMT:
return collectMetaKeyFromDropViewStmt(pCxt, (SDropViewStmt*)pStmt);
default:
break;
}

View File

@ -35,7 +35,7 @@ typedef struct SAuthRewriteCxt {
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt);
static void setUserAuthInfo(SParseContext* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type,
SUserAuthInfo* pAuth) {
bool isView, SUserAuthInfo* pAuth) {
snprintf(pAuth->user, sizeof(pAuth->user), "%s", pCxt->pUser);
if (NULL == pTabName) {
tNameSetDbName(&pAuth->tbName, pCxt->acctId, pDbName, strlen(pDbName));
@ -43,16 +43,17 @@ static void setUserAuthInfo(SParseContext* pCxt, const char* pDbName, const char
toName(pCxt->acctId, pDbName, pTabName, &pAuth->tbName);
}
pAuth->type = type;
pAuth->isView = isView;
}
static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond) {
static int32_t checkAuthImpl(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond, bool isView) {
SParseContext* pParseCxt = pCxt->pParseCxt;
if (pParseCxt->isSuperUser) {
return TSDB_CODE_SUCCESS;
}
SUserAuthInfo authInfo = {0};
setUserAuthInfo(pCxt->pParseCxt, pDbName, pTabName, type, &authInfo);
setUserAuthInfo(pCxt->pParseCxt, pDbName, pTabName, type, isView, &authInfo);
int32_t code = TSDB_CODE_SUCCESS;
SUserAuthRes authRes = {0};
if (NULL != pCxt->pMetaCache) {
@ -70,6 +71,16 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabNa
return TSDB_CODE_SUCCESS == code ? (authRes.pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code;
}
static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond) {
return checkAuthImpl(pCxt, pDbName, pTabName, type, pCond, false);
}
static int32_t checkViewAuth(SAuthCxt* pCxt, const char* pDbName, const char* pTabName, AUTH_TYPE type, SNode** pCond) {
return checkAuthImpl(pCxt, pDbName, pTabName, type, pCond, true);
}
static EDealRes authSubquery(SAuthCxt* pCxt, SNode* pStmt) {
return TSDB_CODE_SUCCESS == authQuery(pCxt, pStmt) ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
}
@ -238,6 +249,14 @@ static int32_t authAlterTable(SAuthCxt* pCxt, SAlterTableStmt* pStmt) {
return checkAuth(pCxt, pStmt->dbName, pStmt->tableName, AUTH_TYPE_WRITE, NULL);
}
static int32_t authCreateView(SAuthCxt* pCxt, SCreateViewStmt* pStmt) {
return checkAuth(pCxt, pStmt->dbName, NULL, AUTH_TYPE_WRITE, NULL);
}
static int32_t authDropView(SAuthCxt* pCxt, SDropViewStmt* pStmt) {
return checkViewAuth(pCxt, pStmt->dbName, pStmt->viewName, AUTH_TYPE_WRITE, NULL);
}
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR:
@ -283,6 +302,10 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
return authShowCreateTable(pCxt, (SShowCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_VIEW_STMT:
return authCreateView(pCxt, (SCreateViewStmt*)pStmt);
case QUERY_NODE_DROP_VIEW_STMT:
return authDropView(pCxt, (SDropViewStmt*)pStmt);
default:
break;
}

View File

@ -7262,6 +7262,42 @@ static int32_t translateResumeStream(STranslateContext* pCxt, SResumeStreamStmt*
return buildCmdMsg(pCxt, TDMT_MND_RESUME_STREAM, (FSerializeFunc)tSerializeSMResumeStreamReq, &req);
}
static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) {
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) {
int32_t code = validateCreateView(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = (*pCxt->pParseCxt->validateSqlFp)(pCxt->pParseCxt->validateSqlParam, pStmt->pQuery, pCxt->pParseCxt->pSql, &pStmt->createReq);
}
if (TSDB_CODE_SUCCESS == code) {
strncpy(pStmt->createReq.name, pStmt->viewName, sizeof(pStmt->createReq.name) - 1);
snprintf(pStmt->createReq.dbFName, sizeof(pStmt->createReq.dbFName) - 1, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName);
pStmt->createReq.orReplace = pStmt->orReplace;
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_VIEW, (FSerializeFunc)tSerializeSCMCreateViewReq, &pStmt->createReq);
}
tFreeSCMCreateViewReq(&pStmt->createReq);
return code;
}
static int32_t translateDropView(STranslateContext* pCxt, SDropViewStmt* pStmt) {
SMDropViewReq dropReq = {0};
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, dropReq.dbFName);
dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_VIEW, (FSerializeFunc)tSerializeSMDropViewReq, &dropReq);
}
static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
int64_t filesize = 0;
if (taosStatFile(pName, &filesize, NULL, NULL) < 0) {
@ -7693,6 +7729,13 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_RESTORE_VNODE_STMT:
code = translateRestoreDnode(pCxt, (SRestoreComponentNodeStmt*)pNode);
break;
case QUERY_NODE_CREATE_VIEW_STMT:
code = translateCreateView(pCxt, (SCreateStreamStmt*)pNode);
break;
case QUERY_NODE_DROP_VIEW_STMT:
code = translateDropView(pCxt, (SCreateStreamStmt*)pNode);
break;
default:
break;
}

View File

@ -519,9 +519,9 @@ int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName)
}
static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
char* pStr) {
return sprintf(pStr, "%s*%d*%s*%s*%d", pUser, acctId, pDb, (NULL == pTable || '\0' == pTable[0]) ? "``" : pTable,
type);
char* pStr, bool isView) {
return sprintf(pStr, "%s*%d*%s*%s*%d*%d", pUser, acctId, pDb, (NULL == pTable || '\0' == pTable[0]) ? "``" : pTable,
type, isView);
}
static int32_t getIntegerFromAuthStr(const char* pStart, char** pNext) {
@ -691,6 +691,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReq(pMetaCache->pTableCfg, &pCatalogReq->pTableCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReqFromDb(pMetaCache->pViews, &pCatalogReq->pView);
}
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
return code;
}
@ -781,7 +784,7 @@ static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUse
SUserAuthInfo* pUser = taosArrayGet(pUserAuthReq, i);
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(pUser->tbName.acctId, pUser->user, pUser->tbName.dbname, pUser->tbName.tname,
pUser->type, key);
pUser->type, key, pUser->isView);
if (TSDB_CODE_SUCCESS != putMetaDataToHash(key, len, pUserAuthData, i, pUserAuth)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -826,6 +829,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
if (TSDB_CODE_SUCCESS == code) {
code = putTableDataToCache(pCatalogReq->pTableCfg, pMetaData->pTableCfg, &pMetaCache->pTableCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = putDbTableDataToCache(pCatalogReq->pView, pMetaData->pView, &pMetaCache->pViews);
}
pMetaCache->pDnodes = pMetaData->pDnodeList;
return code;
}
@ -880,6 +886,14 @@ int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCac
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta);
}
int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pViews);
}
int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache) {
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pViews);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
@ -985,14 +999,23 @@ static int32_t reserveUserAuthInCacheImpl(const char* pKey, int32_t len, SParseM
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
SParseMetaCache* pMetaCache) {
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(acctId, pUser, pDb, pTable, type, key);
int32_t len = userAuthToString(acctId, pUser, pDb, pTable, type, key, false);
return reserveUserAuthInCacheImpl(key, len, pMetaCache);
}
int32_t reserveViewUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
SParseMetaCache* pMetaCache) {
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(acctId, pUser, pDb, pTable, type, key, true);
return reserveUserAuthInCacheImpl(key, len, pMetaCache);
}
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, SUserAuthInfo* pAuthReq, SUserAuthRes* pAuthRes) {
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(pAuthReq->tbName.acctId, pAuthReq->user, pAuthReq->tbName.dbname,
pAuthReq->tbName.tname, pAuthReq->type, key);
pAuthReq->tbName.tname, pAuthReq->type, key, pAuthReq->isView);
SUserAuthRes* pAuth = NULL;
int32_t code = getMetaDataFromHash(key, len, pMetaCache->pUserAuth, (void**)&pAuth);
if (TSDB_CODE_SUCCESS == code) {
@ -1135,9 +1158,11 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
if (request) {
destoryParseTablesMetaReqHash(pMetaCache->pTableMeta);
destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup);
destoryParseTablesMetaReqHash(pMetaCache->pViews);
} else {
taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pTableVgroup);
taosHashCleanup(pMetaCache->pViews);
}
taosHashCleanup(pMetaCache->pDbVgroup);
taosHashCleanup(pMetaCache->pDbCfg);

View File

@ -454,6 +454,130 @@ TEST_F(ParserInitialCTest, createFunction) {
run("CREATE OR REPLACE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS 'udf' OUTPUTTYPE DOUBLE BUFSIZE 8 LANGUAGE 'python'");
}
/*
* CREATE [ OR REPLACE ] VIEW name [ ( column_name [, ...] ) ] AS query
*
*/
TEST_F(ParserInitialCTest, createView) {
useDb("root", "test");
SCMCreateStreamReq expect = {0};
auto clearCreateStreamReq = [&]() {
tFreeSCMCreateStreamReq(&expect);
memset(&expect, 0, sizeof(SCMCreateStreamReq));
};
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t igExists = 0) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
expect.igExists = igExists;
expect.sql = taosStrdup(pSql);
};
auto setStreamOptions =
[&](int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t triggerType = STREAM_TRIGGER_WINDOW_CLOSE,
int64_t maxDelay = 0, int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY, int8_t igUpdate = STREAM_DEFAULT_IGNORE_UPDATE) {
expect.createStb = createStb;
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
expect.fillHistory = fillHistory;
expect.igExpired = igExpired;
expect.igUpdate = igUpdate;
};
auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) {
SField field = {0};
strcpy(field.name, pFieldName);
field.type = type;
field.bytes = bytes > 0 ? bytes : tDataTypes[type].bytes;
field.flags |= COL_SMA_ON;
if (NULL == expect.pTags) {
expect.pTags = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField));
}
taosArrayPush(expect.pTags, &field);
expect.numOfTags += 1;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_STREAM_STMT);
SCMCreateStreamReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS ==
tDeserializeSCMCreateStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(std::string(req.sourceDB), std::string(expect.sourceDB));
ASSERT_EQ(std::string(req.targetStbFullName), std::string(expect.targetStbFullName));
ASSERT_EQ(req.igExists, expect.igExists);
ASSERT_EQ(std::string(req.sql), std::string(expect.sql));
ASSERT_EQ(req.triggerType, expect.triggerType);
ASSERT_EQ(req.maxDelay, expect.maxDelay);
ASSERT_EQ(req.watermark, expect.watermark);
ASSERT_EQ(req.fillHistory, expect.fillHistory);
ASSERT_EQ(req.igExpired, expect.igExpired);
ASSERT_EQ(req.numOfTags, expect.numOfTags);
if (expect.numOfTags > 0) {
ASSERT_EQ(taosArrayGetSize(req.pTags), expect.numOfTags);
ASSERT_EQ(taosArrayGetSize(req.pTags), taosArrayGetSize(expect.pTags));
for (int32_t i = 0; i < expect.numOfTags; ++i) {
SField* pField = (SField*)taosArrayGet(req.pTags, i);
SField* pExpectField = (SField*)taosArrayGet(expect.pTags, i);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
ASSERT_EQ(pField->flags, pExpectField->flags);
}
}
ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq);
ASSERT_EQ(req.createStb, expect.createStb);
ASSERT_EQ(req.igUpdate, expect.igUpdate);
tFreeSCMCreateStreamReq(&req);
});
setCreateStreamReq("s1", "test", "create stream s1 into st3 as select count(*) from t1 interval(10s)", "st3");
setStreamOptions();
run("CREATE STREAM s1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReq(
"s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 0 ignore "
"update 1 into st3 as select count(*) from t1 interval(10s)",
"st3", 1);
setStreamOptions(STREAM_CREATE_STABLE_TRUE, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND,
10 * MILLISECOND_PER_SECOND, 0, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 0 IGNORE "
"UPDATE 1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReq("s1", "test",
"create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as "
"select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)",
"st3");
addTag("tname", TSDB_DATA_TYPE_VARCHAR, 10 + VARSTR_HEADER_SIZE);
addTag("id", TSDB_DATA_TYPE_INT);
setStreamOptions();
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
clearCreateStreamReq();
// st1 already exists
setCreateStreamReq(
"s1", "test",
"create stream s1 into st1 tags(tag2) as select max(c1), c2 from t1 partition by tbname tag2 interval(10s)",
"st1");
setStreamOptions(STREAM_CREATE_STABLE_FALSE);
run("CREATE STREAM s1 INTO st1 TAGS(tag2) AS SELECT MAX(c1), c2 FROM t1 PARTITION BY TBNAME tag2 INTERVAL(10S)");
clearCreateStreamReq();
}
/*
* CREATE MNODE ON DNODE dnode_id
*/

View File

@ -575,6 +575,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not al
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalidate varbinary value")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_IP_RANGE, "Invalid IPV4 address ranges")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type")
//planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")