diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3f03102a7a..93cd351543 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -634,7 +634,10 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { int32_t code = 0; size_t size = taosArrayGetSize(tasks); - ASSERT(size >= 2); + if (size < 2) { + mError("task list size is less than 2"); + return; + } SArray* pDownTaskList = taosArrayGetP(tasks, size - 1); SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 93397e3a8c..89617d74a7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1208,7 +1208,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { streamMutexLock(&execInfo.lock); if (taosArrayGetSize(execInfo.pNodeList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); - ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); + if (taosArrayGetSize(execInfo.pTaskList) != 0) { + streamMutexUnlock(&execInfo.lock); + mError("stream task node change checking done, no vgroups exist, but task list is not empty"); + return TSDB_CODE_FAILED; + } } SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); @@ -2788,7 +2792,13 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (((now - pe->ts) >= 10 * 1000) || allSame) { mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, pe->req.startTs, (now - pe->ts) / 1000.0); - ASSERT(chkId <= pe->req.checkpointId); + if (chkId > pe->req.checkpointId) { + streamMutexUnlock(&execInfo.lock); + taosArrayDestroy(pStreamList); + mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId, + pe->req.checkpointId, chkId); + return TSDB_CODE_FAILED; + } code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); @@ -2828,7 +2838,12 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (taosArrayGetSize(pInfo->pTaskList) == 0) { mndClearConsensusRspEntry(pInfo); - ASSERT(streamId != -1); + if (streamId == -1) { + streamMutexUnlock(&execInfo.lock); + taosArrayDestroy(pStreamList); + mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId); + return TSDB_CODE_FAILED; + } void* p = taosArrayPush(pStreamList, &streamId); if (p == NULL) { mError("failed to put into stream list, stream:0x%" PRIx64, streamId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 07bba4e1b3..3bb5617a9c 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -13,12 +13,13 @@ * along with this program. If not, see . */ +#include "mndDb.h" +#include "mndStb.h" #include "mndStream.h" #include "mndTrans.h" -#include "tmisce.h" #include "mndVgroup.h" -#include "mndStb.h" -#include "mndDb.h" +#include "taoserror.h" +#include "tmisce.h" struct SStreamTaskIter { SStreamObj *pStream; @@ -905,7 +906,12 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } } - ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); + if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) { + streamMutexUnlock(&pExecNode->lock); + destroyStreamTaskIter(pIter); + mError("task map size, task list size, not equal"); + return; + } // 2. remove stream entry in consensus hash table and checkpoint-report hash table (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index da5873039b..36f5d31676 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -312,7 +312,12 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } (void)mndRefreshUserIpWhiteList(pMnode); - ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); + SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm); + if (commitIdx != fsmIndex) { + mError("vgId:1, sync restore finished, but commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx, + fsmIndex); + mndSetRestored(pMnode, false); + } } int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {