diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b1145b0c6b..b5d22cb7a5 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,6 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 2bd13f395f..ebd34fb2a5 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -22,6 +22,7 @@ #include "mndMnode.h" #include "mndShow.h" #include "mndStb.c" +#include "mndStream.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -404,6 +405,18 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); } + SStreamObj streamObj = {0}; + tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); + tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); + streamObj.createTime = taosGetTimestampMs(); + streamObj.updateTime = streamObj.createTime; + streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.dbUid = pDb->uid; + streamObj.version = 1; + streamObj.sql = pCreate->sql; + /*streamObj.physicalPlan = "";*/ + streamObj.logicalPlan = "not implemented"; + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); if (pTrans == NULL) goto _OVER; @@ -414,6 +427,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -457,6 +471,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { int32_t code = -1; SStbObj *pStb = NULL; SSmaObj *pSma = NULL; + SStreamObj *pStream = NULL; SDbObj *pDb = NULL; SUserObj *pUser = NULL; SMCreateSmaReq createReq = {0}; @@ -476,6 +491,12 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); goto _OVER; } + + pStream = mndAcquireStream(pMnode, createReq.name); + if (pStream != NULL) { + mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); + goto _OVER; + } pSma = mndAcquireSma(pMnode, createReq.name); if (pSma != NULL) { @@ -514,6 +535,7 @@ _OVER: mndReleaseStb(pMnode, pStb); mndReleaseSma(pMnode, pSma); + mndReleaseStream(pMnode, pStream); mndReleaseDb(pMnode, pDb); mndReleaseUser(pMnode, pUser); tFreeSMCreateSmaReq(&createReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c87bb015b4..490c2f69e9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -218,13 +218,13 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { - if (NULL == pCreate->ast) { +static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { + if (NULL == ast) { return TSDB_CODE_SUCCESS; } SNode *pAst = NULL; - int32_t code = nodesStringToNode(pCreate->ast, &pAst); + int32_t code = nodesStringToNode(ast, &pAst); SQueryPlan *pPlan = NULL; if (TSDB_CODE_SUCCESS == code) { @@ -245,6 +245,28 @@ static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char ** return code; } +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { + mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); + return -1; + } + + if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); + return -1; + } + + SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + + return 0; +} + static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; @@ -259,11 +281,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { - mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); - return -1; - } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); @@ -271,20 +288,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { - mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } - SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/test/sma/sma.cpp b/source/dnode/mnode/impl/test/sma/sma.cpp index 5b48906681..006524543f 100644 --- a/source/dnode/mnode/impl/test/sma/sma.cpp +++ b/source/dnode/mnode/impl/test/sma/sma.cpp @@ -156,7 +156,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1; createReq.sql = (char*)sql; createReq.sqlLen = strlen(createReq.sql) + 1; - createReq.ast = (char*)expr; + createReq.ast = (char*)ast; createReq.astLen = strlen(createReq.ast) + 1; int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq); @@ -201,7 +201,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { test.SendShowRetrieveReq(); EXPECT_EQ(test.GetShowRows(), 1); } - +#if 0 { pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen); pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen); @@ -233,4 +233,5 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { test.SendShowRetrieveReq(); EXPECT_EQ(test.GetShowRows(), 0); } +#endif }