diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b97eaf31d1..ebc2e50065 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -133,6 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); bool mndStreamNodeIsUpdated(SMnode *pMnode); +bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3ec99f6e44..d486570300 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -801,12 +801,23 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } if (createReq.sql != NULL) { - sqlLen = strlen(createReq.sql); - sql = taosMemoryMalloc(sqlLen + 1); + sql = taosStrdup(createReq.sql); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); + } - memset(sql, 0, sqlLen + 1); - memcpy(sql, createReq.sql, sqlLen); + SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB); + if (pSourceDb == NULL) { + code = terrno; + mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB, + tstrerror(code)); + goto _OVER; + } + + bool snodeCheckSucc = mndCheckForSnode(pMnode, pSourceDb); + mndReleaseDb(pMnode, pSourceDb); + if (!snodeCheckSucc) { + code = TSDB_CODE_SNODE_NOT_DEPLOYED; + goto _OVER; } // build stream obj from request diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 6e48c58b30..206422b795 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1497,6 +1497,30 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) { return updated; } +bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SSnodeObj *pObj = NULL; + + if (pSrcDb->cfg.replications == 1) { + return true; + } else { + while (1) { + pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + sdbRelease(pSdb, pObj); + sdbCancelFetch(pSdb, pIter); + return true; + } + + mError("snode not existed when trying to create stream in db with multiple replica"); + return false; + } +} + uint32_t seed = 0; static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 9dbf16cb48..ab8eede0dd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -629,7 +629,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); + code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _err; diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 8b77fe7cb1..0da9acfd1d 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* } } - stDebug("vgId:%d start %s tmr succ", vgId, pMsg); + stTrace("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) {