diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 97ee6ad162..9af4eced72 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -588,7 +588,8 @@ typedef struct { int8_t status; int8_t createdBy; // STREAM_CREATED_BY__USER or SMA int32_t fixedSinkVgId; // 0 for shuffle - int64_t smaId; // 0 for unused + SVgObj fixedSinkVg; + int64_t smaId; // 0 for unused int8_t trigger; int32_t triggerParam; int64_t waterMark; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 22a5f37334..d42610efa5 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -229,11 +229,14 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr taosArrayPush(tasks, &pTask); pTask->nodeId = pStream->fixedSinkVgId; +#if 0 SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); if (pVgroup == NULL) { return -1; } pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); +#endif + pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); // source pTask->sourceType = TASK_SOURCE__MERGE; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; @@ -254,7 +257,8 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; - mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + /*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/ + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 5c3c4d66a2..7aeac146dd 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -295,9 +295,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm } static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) { - SEncoder encoder = {0}; - int32_t contLen; - SName name = {0}; + SEncoder encoder = {0}; + int32_t contLen; + SName name = {0}; tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); SVDropTSmaReq req = {0}; @@ -482,14 +482,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); } - SVgObj smaVgObj = {0}; - if (mndAllocSmaVgroup(pMnode, pDb, &smaVgObj) != 0) { - mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); - return -1; - } - - smaObj.dstVgId = smaVgObj.vgId; - SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); @@ -502,7 +494,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.createdBy = STREAM_CREATED_BY__SMA; streamObj.fixedSinkVgId = smaObj.dstVgId; streamObj.smaId = smaObj.uid; - /*streamObj.physicalPlan = "";*/ + + if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { + mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); + return -1; + } + smaObj.dstVgId = streamObj.fixedSinkVg.vgId; int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq); @@ -512,11 +509,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mndTransSetDbInfo(pTrans, pDb); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; - if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &smaVgObj) != 0) goto _OVER; + if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; - if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &smaVgObj) != 0) goto _OVER; + if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &smaVgObj) != 0) goto _OVER; + if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -656,7 +653,7 @@ static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj * return 0; } -static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { +static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); if (pVgRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;