diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index b94c60c4ab..9f168e2c83 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -125,7 +125,7 @@ typedef struct SMnode { } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); -int64_t mndGenerateUid(char *name, int32_t len); +int64_t mndGenerateUid(const char *name, int32_t len); int32_t mndAcquireRpcRef(SMnode *pMnode); void mndReleaseRpcRef(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index df8dc42d17..29e68ce4e8 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -624,7 +624,7 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } // Note: uid 0 is reserved -int64_t mndGenerateUid(char *name, int32_t len) { +int64_t mndGenerateUid(const char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); do { int64_t us = taosGetTimestampUs(); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index c040f0d05b..88629ebc69 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -479,7 +479,8 @@ static void mndDestroySmaObj(SSmaObj *pSmaObj) { } } -static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb, + const char *streamName) { SSmaObj smaObj = {0}; memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN); @@ -520,12 +521,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea } SStreamObj streamObj = {0}; - tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); + tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN); streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; - streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); + streamObj.uid = mndGenerateUid(streamName, strlen(streamName)); streamObj.sourceDbUid = pDb->uid; streamObj.targetDbUid = pDb->uid; streamObj.version = 1; @@ -590,7 +591,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (pTrans == NULL) goto _OVER; mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetSerial(pTrans); - mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); + mDebug("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; @@ -638,6 +639,14 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { return 0; } +static void mndGetStreamNameFromSmaName(char *streamName, char *smaName) { + SName n; + tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + streamName[0] = '1'; + streamName[1] = '.'; + strcpy(streamName + 2, tNameGetTableName(&n)); +} + static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; @@ -663,9 +672,12 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { goto _OVER; } - pStream = mndAcquireStream(pMnode, createReq.name); + char streamName[TSDB_TABLE_FNAME_LEN] = {0}; + mndGetStreamNameFromSmaName(streamName, createReq.name); + + pStream = mndAcquireStream(pMnode, streamName); if (pStream != NULL) { - mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); + mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName); terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; goto _OVER; } @@ -692,7 +704,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { goto _OVER; } - code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb); + code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: @@ -789,7 +801,10 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mndTransSetDbName(pTrans, pDb->name, NULL); - SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name); + char streamName[TSDB_TABLE_FNAME_LEN] = {0}; + mndGetStreamNameFromSmaName(streamName, pSma->name); + + SStreamObj *pStream = mndAcquireStream(pMnode, streamName); if (pStream == NULL || pStream->smaId != pSma->uid) { sdbRelease(pMnode->pSdb, pStream); goto _OVER; @@ -838,7 +853,10 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVgroup == NULL) goto _OVER; - SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name); + char streamName[TSDB_TABLE_FNAME_LEN] = {0}; + mndGetStreamNameFromSmaName(streamName, pSma->name); + + SStreamObj *pStream = mndAcquireStream(pMnode, streamName); if (pStream != NULL && pStream->smaId == pSma->uid) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());