diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5fc6131a2e..149165b3fc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -269,12 +269,15 @@ typedef struct SSchema { typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; + float xFilesFactor; + int32_t aggregationMethod; + int32_t delay; int32_t numOfColumns; int32_t numOfTags; int32_t commentLen; SArray* pColumns; SArray* pTags; - char *comment; + char* comment; } SMCreateStbReq; int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e39fe02f29..35d6a3595f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -508,6 +508,9 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeFloat(&encoder, pReq->xFilesFactor) < 0) return -1; + if (tEncodeI32(&encoder, pReq->aggregationMethod) < 0) return -1; + if (tEncodeI32(&encoder, pReq->delay) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; @@ -541,6 +544,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeFloat(&decoder, &pReq->xFilesFactor) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->aggregationMethod) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->delay) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d4f5fa1e3b..24b6ba3d33 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -341,6 +341,9 @@ typedef struct { int64_t dbUid; int32_t version; int32_t nextColId; + float xFilesFactor; + int32_t aggregationMethod; + int32_t delay; int32_t numOfColumns; int32_t numOfTags; int32_t commentLen; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b1145b0c6b..b5d22cb7a5 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,6 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 2bd13f395f..ebd34fb2a5 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -22,6 +22,7 @@ #include "mndMnode.h" #include "mndShow.h" #include "mndStb.c" +#include "mndStream.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -404,6 +405,18 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); } + SStreamObj streamObj = {0}; + tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); + tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); + streamObj.createTime = taosGetTimestampMs(); + streamObj.updateTime = streamObj.createTime; + streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.dbUid = pDb->uid; + streamObj.version = 1; + streamObj.sql = pCreate->sql; + /*streamObj.physicalPlan = "";*/ + streamObj.logicalPlan = "not implemented"; + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); if (pTrans == NULL) goto _OVER; @@ -414,6 +427,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -457,6 +471,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { int32_t code = -1; SStbObj *pStb = NULL; SSmaObj *pSma = NULL; + SStreamObj *pStream = NULL; SDbObj *pDb = NULL; SUserObj *pUser = NULL; SMCreateSmaReq createReq = {0}; @@ -476,6 +491,12 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); goto _OVER; } + + pStream = mndAcquireStream(pMnode, createReq.name); + if (pStream != NULL) { + mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); + goto _OVER; + } pSma = mndAcquireSma(pMnode, createReq.name); if (pSma != NULL) { @@ -514,6 +535,7 @@ _OVER: mndReleaseStb(pMnode, pStb); mndReleaseSma(pMnode, pSma); + mndReleaseStream(pMnode, pStream); mndReleaseDb(pMnode, pDb); mndReleaseUser(pMnode, pUser); tFreeSMCreateSmaReq(&createReq); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e8d6157661..52a257eaeb 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -85,6 +85,9 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->aggregationMethod, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->delay, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER) @@ -148,6 +151,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) + int32_t xFilesFactor = 0; + SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, STB_DECODE_OVER) + pStb->xFilesFactor = xFilesFactor / 10000.0f; + SDB_GET_INT32(pRaw, dataPos, &pStb->aggregationMethod, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->delay, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fb6906b5c9..3c142c560b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -240,13 +240,13 @@ static SArray *mndExtractNamesFromAst(const SNode *pAst) { return names; } -static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { - if (NULL == pCreate->ast) { +static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { + if (NULL == ast) { return TSDB_CODE_SUCCESS; } SNode *pAst = NULL; - int32_t code = nodesStringToNode(pCreate->ast, &pAst); + int32_t code = nodesStringToNode(ast, &pAst); SQueryPlan *pPlan = NULL; if (TSDB_CODE_SUCCESS == code) { @@ -267,6 +267,41 @@ static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char ** return code; } +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { + SNode *pAst = NULL; + if (nodesStringToNode(ast, &pAst) < 0) { + return -1; + } + SArray *names = mndExtractNamesFromAst(pAst); + printf("|"); + for (int i = 0; i < taosArrayGetSize(names); i++) { + printf(" %15s |", (char *)taosArrayGetP(names, i)); + } + printf("\n=======================================================\n"); + + pStream->outputName = names; + + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { + mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); + return -1; + } + + if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); + return -1; + } + + SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + + return 0; +} + static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; @@ -281,24 +316,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; - SNode *pAst = NULL; - if (nodesStringToNode(pCreate->ast, &pAst) < 0) { - return -1; - } - SArray *names = mndExtractNamesFromAst(pAst); - printf("|"); - for (int i = 0; i < taosArrayGetSize(names); i++) { - printf(" %15s |", (char *)taosArrayGetP(names, i)); - } - printf("\n=======================================================\n"); - - streamObj.outputName = names; - - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { - mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); - return -1; - } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); @@ -306,20 +323,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { - mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } - SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/test/sma/sma.cpp b/source/dnode/mnode/impl/test/sma/sma.cpp index 5b48906681..006524543f 100644 --- a/source/dnode/mnode/impl/test/sma/sma.cpp +++ b/source/dnode/mnode/impl/test/sma/sma.cpp @@ -156,7 +156,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1; createReq.sql = (char*)sql; createReq.sqlLen = strlen(createReq.sql) + 1; - createReq.ast = (char*)expr; + createReq.ast = (char*)ast; createReq.astLen = strlen(createReq.ast) + 1; int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq); @@ -201,7 +201,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { test.SendShowRetrieveReq(); EXPECT_EQ(test.GetShowRows(), 1); } - +#if 0 { pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen); pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen); @@ -233,4 +233,5 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { test.SendShowRetrieveReq(); EXPECT_EQ(test.GetShowRows(), 0); } +#endif }