fix(stream): check return value

This commit is contained in:
Haojun Liao 2024-07-30 10:13:02 +08:00
parent d89dac8697
commit 0bb263076e
3 changed files with 15 additions and 14 deletions

View File

@ -1020,10 +1020,10 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) { int8_t mndTrigger, bool lock) {
int32_t code = -1; int32_t code = TSDB_CODE_SUCCESS;
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
return TSDB_CODE_SUCCESS; return code;
} }
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
@ -1087,13 +1087,11 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
goto _ERR; goto _ERR;
} }
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { code = mndTransPrepare(pMnode, pTrans);
code = terrno; if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to prepare checkpoint trans since %s", terrstr()); mError("failed to prepare checkpoint trans since %s", terrstr());
goto _ERR;
} }
code = 0;
_ERR: _ERR:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -1458,7 +1456,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2179,7 +2178,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2273,7 +2272,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2434,7 +2434,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -3129,7 +3129,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream,
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);

View File

@ -94,7 +94,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != 0) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -197,7 +197,8 @@ int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
return code; return code;
} }
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;

View File

@ -1132,7 +1132,7 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);