diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9b70116d8d..21969cc404 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -783,13 +783,15 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / if (numOfStream > MND_STREAM_MAX_NUM) { mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); sdbCancelFetch(pMnode->pSdb, pIter); - return TSDB_CODE_MND_TOO_MANY_STREAMS; + terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + return terrno; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { mError("Cannot write the same stable as other stream:%s", pStream->name); sdbCancelFetch(pMnode->pSdb, pIter); - return TSDB_CODE_MND_INVALID_TARGET_TABLE; + terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + return terrno; } } @@ -797,12 +799,12 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / } static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; - int32_t code = -1; + SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SStreamObj streamObj = {0}; char * sql = NULL; int32_t sqlLen = 0; + terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createStreamReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { @@ -825,7 +827,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (pStream != NULL) { if (createStreamReq.igExists) { mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); - code = 0; goto _OVER; } else { terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; @@ -848,8 +849,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - code = checkForNumOfStreams(pMnode, &streamObj); - if (code != TSDB_CODE_SUCCESS) { + if (checkForNumOfStreams(pMnode, &streamObj) < 0) { goto _OVER; } @@ -912,8 +912,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); - code = TSDB_CODE_ACTION_IN_PROGRESS; - SName dbname = {0}; tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -930,7 +928,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } _OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); } @@ -940,7 +938,8 @@ _OVER: if (sql != NULL) { taosMemoryFreeClear(sql); } - return code; + + return terrno; } int64_t mndStreamGenChkpId(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c9756ef814..7749decf91 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -834,7 +834,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) { if (mndCheckTransConflict(pMnode, pTrans)) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - return -1; + return terrno; } return 0;