From ebe1d09d7491cb797996e6cbe659eefe3dbc3749 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Jul 2024 10:07:18 +0800 Subject: [PATCH] fix(stream): check return value. --- source/dnode/mnode/impl/inc/mndStream.h | 24 ++++---- source/dnode/mnode/impl/src/mndSma.c | 31 ++++++----- source/dnode/mnode/impl/src/mndStream.c | 62 +++++++++++++-------- source/dnode/mnode/impl/src/mndStreamHb.c | 20 ++++++- source/dnode/mnode/impl/src/mndStreamUtil.c | 24 +++++--- 5 files changed, 102 insertions(+), 59 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 0b6b6a9ef2..0a93848843 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -96,20 +96,20 @@ typedef struct STaskChkptInfo { int8_t dropHTask; }STaskChkptInfo; -int32_t mndInitStream(SMnode *pMnode); -void mndCleanupStream(SMnode *pMnode); -SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); -void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); -int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); -int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); -int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); -bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); -int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); +int32_t mndInitStream(SMnode *pMnode); +void mndCleanupStream(SMnode *pMnode); +int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream); +void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); +int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); +int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); +bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); +int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); -SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode, int32_t acceptCode); @@ -146,7 +146,7 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks); +int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 946df84a0f..e4ac503d3b 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -768,8 +768,8 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { char streamName[TSDB_TABLE_FNAME_LEN] = {0}; mndGetStreamNameFromSmaName(streamName, createReq.name); - pStream = mndAcquireStream(pMnode, streamName); - if (pStream != NULL) { + code = mndAcquireStream(pMnode, streamName, &pStream); + if (pStream != NULL || code != 0) { mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName); terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; goto _OVER; @@ -905,8 +905,10 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p char streamName[TSDB_TABLE_FNAME_LEN] = {0}; mndGetStreamNameFromSmaName(streamName, pSma->name); - SStreamObj *pStream = mndAcquireStream(pMnode, streamName); - if (pStream == NULL || pStream->smaId != pSma->uid) { + SStreamObj *pStream = NULL; + + code = mndAcquireStream(pMnode, streamName, &pStream); + if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) { sdbRelease(pMnode->pSdb, pStream); goto _OVER; } else { @@ -960,8 +962,9 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p char streamName[TSDB_TABLE_FNAME_LEN] = {0}; mndGetStreamNameFromSmaName(streamName, pSma->name); - SStreamObj *pStream = mndAcquireStream(pMnode, streamName); - if (pStream != NULL && pStream->smaId == pSma->uid) { + SStreamObj *pStream = NULL; + code = mndAcquireStream(pMnode, streamName, &pStream); + if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) { if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); mndReleaseStream(pMnode, pStream); @@ -1734,8 +1737,8 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { goto _OVER; } - pStream = mndAcquireStream(pMnode, streamName); - if (pStream != NULL) { + code = mndAcquireStream(pMnode, streamName, &pStream); + if (pStream != NULL || code != 0) { mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName); terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST; goto _OVER; @@ -2215,7 +2218,7 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt SSmaObj * pBaseTsma = NULL; SSdb * pSdb = pMnode->pSdb; void * pIter = NULL; - SStreamObj * pStreamObj = NULL; + SStreamObj * pStream = NULL; SStbObj * pStb = NULL; while (1) { @@ -2237,14 +2240,16 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt char streamName[TSDB_TABLE_FNAME_LEN] = {0}; tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname); - pStreamObj = mndAcquireStream(pMnode, streamName); - if (!pStreamObj) { + pStream = NULL; + + code = mndAcquireStream(pMnode, streamName, &pStream); + if (!pStream || (code != 0)) { sdbRelease(pSdb, pSma); continue; } - int64_t streamId = pStreamObj->uid; - mndReleaseStream(pMnode, pStreamObj); + int64_t streamId = pStream->uid; + mndReleaseStream(pMnode, pStream); STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); if (!pTsma) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d57dc6e52e..fa4b9b64f4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -251,13 +251,13 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream return 0; } -SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) { - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName); - if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { +int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) { + SSdb *pSdb = pMnode->pSdb; + (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName); + if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; } - return pStream; + return terrno; } void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { @@ -706,7 +706,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char *sql = NULL; int32_t sqlLen = 0; const char *pMsg = "create stream tasks on dnodes"; - + int32_t code = 0; terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createReq = {0}; @@ -726,8 +726,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - pStream = mndAcquireStream(pMnode, createReq.name); - if (pStream != NULL) { + code = mndAcquireStream(pMnode, createReq.name, &pStream); + if (pStream != NULL || code != 0) { if (createReq.igExists) { mInfo("stream:%s, already exist, ignore exist is set", createReq.name); goto _OVER; @@ -1077,7 +1077,12 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } bool allReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + SArray *pNodeSnapshot = NULL; + + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + if (code) { + mError("failed to get the vgroup snapshot, ignore it and continue"); + } if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); @@ -1289,6 +1294,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; + int32_t code = 0; SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { @@ -1299,8 +1305,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mDebug("recv drop stream:%s msg", dropReq.name); - pStream = mndAcquireStream(pMnode, dropReq.name); - if (pStream == NULL) { + code = mndAcquireStream(pMnode, dropReq.name, &pStream); + if (pStream == NULL || code != 0) { if (dropReq.igNotExists) { mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name); sdbRelease(pMnode->pSdb, pStream); @@ -1364,7 +1370,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); + code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); // drop all tasks if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { @@ -1906,6 +1912,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; + int32_t code = 0; SMPauseStreamReq pauseReq = {0}; if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { @@ -1913,9 +1920,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - pStream = mndAcquireStream(pMnode, pauseReq.name); - - if (pStream == NULL) { + code = mndAcquireStream(pMnode, pauseReq.name, &pStream); + if (pStream == NULL || code != 0) { if (pauseReq.igNotExists) { mInfo("stream:%s, not exist, not pause stream", pauseReq.name); return 0; @@ -2000,7 +2006,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); + code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); // if nodeUpdate happened, not send pause trans if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) { @@ -2039,6 +2045,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; + int32_t code = 0; if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return -1; @@ -2050,9 +2057,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - pStream = mndAcquireStream(pMnode, resumeReq.name); - - if (pStream == NULL) { + code = mndAcquireStream(pMnode, resumeReq.name, &pStream); + if (pStream == NULL || code != 0) { if (resumeReq.igNotExists) { mInfo("stream:%s not exist, not resume stream", resumeReq.name); sdbRelease(pMnode->pSdb, pStream); @@ -2089,7 +2095,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); + code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); // set the resume action if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { @@ -2348,7 +2354,13 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } bool allReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + SArray *pNodeSnapshot = NULL; + + code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + if (code) { + mError("failed to take the vgroup snapshot, ignore it and continue"); + } + if (!allReady) { taosArrayDestroy(pNodeSnapshot); atomic_store_32(&mndNodeCheckSentinel, 0); @@ -2790,8 +2802,14 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { mDebug("start to process consensus-checkpointId in tmr"); bool allReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + SArray *pNodeSnapshot = NULL; + + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); taosArrayDestroy(pNodeSnapshot); + if (code) { + mError("failed to get the vgroup snapshot, ignore it and continue"); + } + if (!allReady) { mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process"); taosArrayDestroy(pStreamList); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index bc10ec211d..5f0434e3d0 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -228,6 +228,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamHbMsg req = {0}; SArray *pFailedChkpt = NULL; SArray *pOrphanTasks = NULL; + int32_t code = 0; if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) { @@ -296,8 +297,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId); int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks); - mndAddConsensusTasks(pInfo, &cp); + SCheckpointConsensusInfo *pInfo = NULL; + + code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo); + if (code == 0) { + mndAddConsensusTasks(pInfo, &cp); + } else { + mError("failed to get consensus checkpoint-info"); + } + mndReleaseStream(pMnode, pStream); } @@ -338,9 +346,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // kill the checkpoint trans and then set all tasks status to be normal if (taosArrayGetSize(pFailedChkpt) > 0) { bool allReady = true; + if (pMnode != NULL) { - SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); + SArray *p = NULL; + + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); taosArrayDestroy(p); + if (code) { + mError("failed to get the vgroup snapshot, ignore it and continue"); + } } else { allReady = false; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 843c024286..3a84856ae0 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -80,11 +80,12 @@ void destroyStreamTaskIter(SStreamTaskIter* pIter) { taosMemoryFree(pIter); } -SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; int32_t replica = -1; // do the replica check + int32_t code = 0; *allReady = true; SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); @@ -157,7 +158,8 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { sdbRelease(pSdb, pObj); } - return pVgroupList; + *pList = pVgroupList; + return code; } SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { @@ -935,10 +937,10 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i return TSDB_CODE_ACTION_IN_PROGRESS; } -SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) { - void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); +int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) { + *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); if (pInfo != NULL) { - return (SCheckpointConsensusInfo*)pInfo; + return 0; } SCheckpointConsensusInfo p = { @@ -947,10 +949,14 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, .streamId = streamId, }; - taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); - - void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); - return pChkptInfo; + int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); + if (code == 0) { + void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId)); + *pInfo = pChkptInfo; + } else { + *pInfo = NULL; + } + return code; } // no matter existed or not, add the request into info list anyway, since we need to send rsp mannually