Merge pull request #11554 from taosdata/feature/3.0_wxy
feat: sql command "create stream"
This commit is contained in:
commit
61a6bee8f5
|
@ -1209,12 +1209,17 @@ typedef struct {
|
|||
int32_t code;
|
||||
} STaskDropRsp;
|
||||
|
||||
#define STREAM_TRIGGER_AT_ONCE 1
|
||||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char outputSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
char* sql;
|
||||
char* ast;
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
} SCMCreateStreamReq;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -272,14 +272,9 @@ typedef struct SKillStmt {
|
|||
int32_t targetId;
|
||||
} SKillStmt;
|
||||
|
||||
typedef enum EStreamTriggerType {
|
||||
STREAM_TRIGGER_AT_ONCE = 1,
|
||||
STREAM_TRIGGER_WINDOW_CLOSE
|
||||
} EStreamTriggerType;
|
||||
|
||||
typedef struct SStreamOptions {
|
||||
ENodeType type;
|
||||
EStreamTriggerType triggerType;
|
||||
int8_t triggerType;
|
||||
SNode* pWatermark;
|
||||
} SStreamOptions;
|
||||
|
||||
|
|
|
@ -109,6 +109,8 @@ typedef struct SWindowLogicNode {
|
|||
int64_t sessionGap;
|
||||
SNode* pTspk;
|
||||
SNode* pStateExpr;
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
} SWindowLogicNode;
|
||||
|
||||
typedef struct SSortLogicNode {
|
||||
|
@ -251,6 +253,8 @@ typedef struct SWinodwPhysiNode {
|
|||
SNodeList* pExprs; // these are expression list of parameter expression of function
|
||||
SNodeList* pFuncs;
|
||||
SNode* pTspk; // timestamp primary key
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
} SWinodwPhysiNode;
|
||||
|
||||
typedef struct SIntervalPhysiNode {
|
||||
|
|
|
@ -30,6 +30,8 @@ typedef struct SPlanContext {
|
|||
bool topicQuery;
|
||||
bool streamQuery;
|
||||
bool showRewrite;
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
} SPlanContext;
|
||||
|
||||
// Create the physical plan for the query, according to the AST.
|
||||
|
|
|
@ -597,6 +597,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST TAOS_DEF_ERROR_CODE(0, 0x2624)
|
||||
#define TSDB_CODE_PAR_INVALID_OPTION_UNIT TAOS_DEF_ERROR_CODE(0, 0x2625)
|
||||
#define TSDB_CODE_PAR_INVALID_KEEP_UNIT TAOS_DEF_ERROR_CODE(0, 0x2626)
|
||||
#define TSDB_CODE_PAR_AGG_FUNC_NESTING TAOS_DEF_ERROR_CODE(0, 0x2627)
|
||||
|
||||
//planner
|
||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||
|
|
|
@ -3381,6 +3381,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
|
||||
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||
|
||||
|
@ -3404,6 +3406,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1;
|
||||
|
||||
if (sqlLen > 0) {
|
||||
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
|
||||
|
|
|
@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
|||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -429,7 +429,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
|
|||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
|
|
@ -218,7 +218,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
|
||||
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
|
||||
if (NULL == ast) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -232,6 +232,8 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
|
|||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
.streamQuery = true,
|
||||
.triggerType = triggerType,
|
||||
.watermark = watermark,
|
||||
};
|
||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
||||
}
|
||||
|
@ -245,7 +247,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) {
|
||||
SNode *pAst = NULL;
|
||||
|
||||
if (nodesStringToNode(ast, &pAst) < 0) {
|
||||
|
@ -265,7 +267,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
|||
|
||||
#endif
|
||||
|
||||
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
|
||||
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, triggerType, watermark, &pStream->physicalPlan)) {
|
||||
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -313,7 +315,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
|||
}
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) {
|
||||
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
|
|
|
@ -1104,6 +1104,8 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
|
|||
static const char* jkWindowPhysiPlanExprs = "Exprs";
|
||||
static const char* jkWindowPhysiPlanFuncs = "Funcs";
|
||||
static const char* jkWindowPhysiPlanTsPk = "TsPk";
|
||||
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
|
||||
static const char* jkWindowPhysiPlanWatermark = "Watermark";
|
||||
|
||||
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
|
||||
|
@ -1118,6 +1120,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1135,6 +1143,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -427,7 +427,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
|
|||
|
||||
/************************************************ create/drop stream **************************************************/
|
||||
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A)
|
||||
stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, B, C, D); }
|
||||
stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); }
|
||||
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
|
||||
|
||||
into_opt(A) ::= . { A = NULL; }
|
||||
|
|
|
@ -481,6 +481,14 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) {
|
||||
*((bool*)pContext) = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
||||
|
@ -492,6 +500,11 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
|
|||
if (fmIsAggFunc(pFunc->funcId) && beforeHaving(pCxt->currClause)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION);
|
||||
}
|
||||
bool haveAggFunc = false;
|
||||
nodesWalkExprs(pFunc->pParameterList, haveAggFunction, &haveAggFunc);
|
||||
if (haveAggFunc) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
|
@ -2173,6 +2186,14 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) {
|
||||
code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode : TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
createReq.triggerType = pStmt->pOptions->triggerType;
|
||||
createReq.watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
|
||||
}
|
||||
|
|
|
@ -91,6 +91,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Invalid option %s unit: %c, only m, h, d allowed";
|
||||
case TSDB_CODE_PAR_INVALID_KEEP_UNIT:
|
||||
return "Invalid option keep unit: %c, %c, %c, only m, h, d allowed";
|
||||
case TSDB_CODE_PAR_AGG_FUNC_NESTING:
|
||||
return "Aggregate functions do not support nesting";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
|
@ -3452,7 +3452,7 @@ static YYACTIONTYPE yy_reduce(
|
|||
{ yymsp[-1].minor.yy100 = strtol(yymsp[0].minor.yy0.z, NULL, 10); }
|
||||
break;
|
||||
case 234: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options into_opt AS query_expression */
|
||||
{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-3].minor.yy452, yymsp[-2].minor.yy452, yymsp[0].minor.yy452); }
|
||||
{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-2].minor.yy452, yymsp[-3].minor.yy452, yymsp[0].minor.yy452); }
|
||||
break;
|
||||
case 235: /* cmd ::= DROP STREAM exists_opt stream_name */
|
||||
{ pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy659, &yymsp[0].minor.yy479); }
|
||||
|
|
|
@ -463,6 +463,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) {
|
||||
int32_t code = nodesCollectFuncs(pSelect, fmIsWindowClauseFunc, &pWindow->pFuncs);
|
||||
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
pWindow->triggerType = pCxt->pPlanCxt->triggerType;
|
||||
pWindow->watermark = pCxt->pPlanCxt->watermark;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
|
||||
}
|
||||
|
|
|
@ -796,6 +796,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
|
|||
}
|
||||
}
|
||||
|
||||
pWindow->triggerType = pWindowLogicNode->triggerType;
|
||||
pWindow->watermark = pWindowLogicNode->watermark;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pPhyNode = (SPhysiNode*)pWindow;
|
||||
} else {
|
||||
|
|
|
@ -123,6 +123,12 @@ private:
|
|||
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
|
||||
nodesStringToNode(req.ast, &pCxt->pAstRoot);
|
||||
pCxt->streamQuery = true;
|
||||
} else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) {
|
||||
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot;
|
||||
pCxt->pAstRoot = pStmt->pQuery;
|
||||
pCxt->streamQuery = true;
|
||||
pCxt->triggerType = pStmt->pOptions->triggerType;
|
||||
pCxt->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
||||
} else {
|
||||
pCxt->pAstRoot = pQuery->pRoot;
|
||||
}
|
||||
|
@ -353,11 +359,11 @@ TEST_F(PlannerTest, createTopic) {
|
|||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(PlannerTest, stream) {
|
||||
TEST_F(PlannerTest, createStream) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("SELECT sum(c1) FROM st1");
|
||||
ASSERT_TRUE(run(true));
|
||||
bind("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 interval(10s)");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(PlannerTest, createSmaIndex) {
|
||||
|
|
Loading…
Reference in New Issue