From 0bb263076e18239066626a1698b0ffd71ee36fb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jul 2024 10:13:02 +0800 Subject: [PATCH] fix(stream): check return value --- source/dnode/mnode/impl/src/mndStream.c | 22 ++++++++++----------- source/dnode/mnode/impl/src/mndStreamHb.c | 5 +++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4dcd716f28..e20529f4b6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1020,10 +1020,10 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, int8_t mndTrigger, bool lock) { - int32_t code = -1; + int32_t code = TSDB_CODE_SUCCESS; int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { - return TSDB_CODE_SUCCESS; + return code; } bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); @@ -1087,13 +1087,11 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre goto _ERR; } - if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { - code = terrno; + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to prepare checkpoint trans since %s", terrstr()); - goto _ERR; } - code = 0; _ERR: mndTransDrop(pTrans); return code; @@ -1458,7 +1456,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2179,7 +2178,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { taosWUnLockLatch(&pStream->lock); code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2273,7 +2272,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } taosWUnLockLatch(&pStream->lock); - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2434,7 +2434,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -3129,7 +3129,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, } code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1ca46f128f..507cafabe5 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -94,7 +94,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { } code = mndTransPrepare(pMnode, pTrans); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -197,7 +197,8 @@ int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { return code; } - if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return code; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 5a17d659cd..548eb118c7 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1132,7 +1132,7 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i } code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans);