fix(stream): ignore the in_progress code.

This commit is contained in:
Haojun Liao 2023-11-30 14:32:14 +08:00
parent 71860bfb46
commit de82aba2e5
1 changed files with 4 additions and 9 deletions

View File

@ -758,11 +758,11 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
char *sql = NULL; char *sql = NULL;
int32_t sqlLen = 0; int32_t sqlLen = 0;
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createStreamReq = {0}; SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
@ -785,7 +785,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (pStream != NULL) { if (pStream != NULL) {
if (createStreamReq.igExists) { if (createStreamReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
code = 0;
goto _OVER; goto _OVER;
} else { } else {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
@ -808,8 +807,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
code = checkForNumOfStreams(pMnode, &streamObj); if (checkForNumOfStreams(pMnode, &streamObj) < 0) {
if (code != TSDB_CODE_SUCCESS) {
goto _OVER; goto _OVER;
} }
@ -872,8 +870,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
saveStreamTasksInfo(&streamObj, &execInfo); saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
code = TSDB_CODE_ACTION_IN_PROGRESS;
SName dbname = {0}; SName dbname = {0};
tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -890,8 +886,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
} }
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
code = terrno;
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
} }
@ -902,7 +897,7 @@ _OVER:
taosMemoryFreeClear(sql); taosMemoryFreeClear(sql);
} }
return code; return terrno;
} }
int64_t mndStreamGenChkpId(SMnode *pMnode) { int64_t mndStreamGenChkpId(SMnode *pMnode) {