Merge pull request #23887 from taosdata/fix/3_liaohj
fix(stream):return the error code if create stream failed.
This commit is contained in:
commit
b27601db6d
|
@ -783,13 +783,15 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
|
||||||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
if (numOfStream > MND_STREAM_MAX_NUM) {
|
||||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
return TSDB_CODE_MND_TOO_MANY_STREAMS;
|
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
||||||
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -798,11 +800,11 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
|
||||||
|
|
||||||
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
char * sql = NULL;
|
char * sql = NULL;
|
||||||
int32_t sqlLen = 0;
|
int32_t sqlLen = 0;
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SCMCreateStreamReq createStreamReq = {0};
|
SCMCreateStreamReq createStreamReq = {0};
|
||||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
||||||
|
@ -825,7 +827,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
if (pStream != NULL) {
|
if (pStream != NULL) {
|
||||||
if (createStreamReq.igExists) {
|
if (createStreamReq.igExists) {
|
||||||
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
|
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
|
||||||
code = 0;
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||||
|
@ -848,8 +849,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = checkForNumOfStreams(pMnode, &streamObj);
|
if (checkForNumOfStreams(pMnode, &streamObj) < 0) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -912,8 +912,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
saveStreamTasksInfo(&streamObj, &execInfo);
|
saveStreamTasksInfo(&streamObj, &execInfo);
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
||||||
|
|
||||||
SName dbname = {0};
|
SName dbname = {0};
|
||||||
tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
|
@ -930,7 +928,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -940,7 +938,8 @@ _OVER:
|
||||||
if (sql != NULL) {
|
if (sql != NULL) {
|
||||||
taosMemoryFreeClear(sql);
|
taosMemoryFreeClear(sql);
|
||||||
}
|
}
|
||||||
return code;
|
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
||||||
|
|
|
@ -834,7 +834,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
||||||
if (mndCheckTransConflict(pMnode, pTrans)) {
|
if (mndCheckTransConflict(pMnode, pTrans)) {
|
||||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
return -1;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue