diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index db18f87830..4daeeaa9bf 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -546,13 +546,10 @@ typedef struct { int64_t createTime; int64_t updateTime; int32_t version; - // TODO remove it int64_t smaId; // 0 for unused // info int64_t uid; int8_t status; - // TODO remove it - int32_t vgNum; // config int8_t dropPolicy; int8_t trigger; @@ -565,9 +562,10 @@ typedef struct { char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; int64_t targetStbUid; - // assigned when scheduling int32_t fixedSinkVgId; // 0 for shuffle - SVgObj fixedSinkVg; + // fixedSinkVg is not applicable for encode and decode + SVgObj fixedSinkVg; + // transformation char* sql; char* ast; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 80fe472c56..8e816d2dd6 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -32,6 +32,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, double filesFactor); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 65017027dc..6616e84488 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index b9bd1e4298..20ba71992e 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -18,35 +18,35 @@ #include "mndConsumer.h" int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { - int32_t sz = 0; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; + + if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; + + if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; + + if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; + + if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1; if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->vgNum) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; - /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/ - if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; - // TODO encode tasks - if (pObj->tasks) { - sz = taosArrayGetSize(pObj->tasks); - } - if (tEncodeI32(pEncoder, sz) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1; + if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; + + int32_t sz = taosArrayGetSize(pObj->tasks); + if (tEncodeI32(pEncoder, sz) < 0) return -1; for (int32_t i = 0; i < sz; i++) { SArray *pArray = taosArrayGetP(pObj->tasks, i); int32_t innerSz = taosArrayGetSize(pArray); @@ -58,32 +58,38 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { } if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; + return pEncoder->pos; } int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; + + if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; + + if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; + + if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; + + if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->vgNum) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; - /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/ + if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; + pObj->tasks = NULL; int32_t sz; if (tDecodeI32(pDecoder, &sz) < 0) return -1; @@ -104,6 +110,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { } if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; + return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c53e97cd53..6f8fc748c2 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -346,8 +346,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - ASSERT(pStream->vgNum == 0); - int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); ASSERT(totLevel <= 2); pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 54fd0fe867..3eb5b03efd 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -20,6 +20,7 @@ #include "mndDnode.h" #include "mndInfoSchema.h" #include "mndMnode.h" +#include "mndScheduler.h" #include "mndShow.h" #include "mndStb.h" #include "mndStream.h" @@ -563,6 +564,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.smaId = smaObj.uid; streamObj.watermark = 0; streamObj.trigger = STREAM_TRIGGER_AT_ONCE; + streamObj.ast = strdup(smaObj.ast); if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); @@ -571,6 +573,39 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea smaObj.dstVgId = streamObj.fixedSinkVg.vgId; streamObj.fixedSinkVgId = smaObj.dstVgId; + SNode *pAst = NULL; + if (nodesStringToNode(streamObj.ast, &pAst) < 0) { + ASSERT(0); + return -1; + } + + // extract output schema from ast + if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) { + ASSERT(0); + return -1; + } + + SQueryPlan *pPlan = NULL; + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + .triggerType = streamObj.trigger, + .watermark = streamObj.watermark, + }; + + if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + ASSERT(0); + return -1; + } + + // save physcial plan + if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) { + ASSERT(0); + return -1; + } + if (pAst != NULL) nodesDestroyNode(pAst); + int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; @@ -585,7 +620,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; // if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; + if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; + if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index de0b25b1dc..c1df9dd68c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -640,6 +640,12 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) { + mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); + mndTransDrop(pTrans); + goto _OVER; + } + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); @@ -707,7 +713,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return code; } - // drop stream + // drop stream if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { sdbRelease(pMnode->pSdb, pStream); return -1;