From 27c7e23397506962e6cf0258eba55fcaa26cbef2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 21 Dec 2022 14:26:27 +0800 Subject: [PATCH] refactor: remove assert --- source/dnode/mnode/impl/src/mndConsumer.c | 41 +++++++++-------- source/dnode/mnode/impl/src/mndScheduler.c | 51 +++++++++++----------- source/dnode/mnode/impl/src/mndStream.c | 15 +------ 3 files changed, 47 insertions(+), 60 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 37e2c35225..cf2386348e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -133,7 +133,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); - ASSERT(pConsumer); + if (pConsumer == NULL) { + mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); + return -1; + } mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId, mndConsumerStatusName(pConsumer->status)); @@ -381,8 +384,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { return -1; } - ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0); - atomic_store_32(&pConsumer->hbStatus, 0); // 1. check consumer status @@ -428,9 +429,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { for (int32_t i = 0; i < numOfTopics; i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - // txn guarantees pSub is created - ASSERT(pSub); + taosRLockLatch(&pSub->lock); SMqSubTopicEp topicEp = {0}; @@ -438,7 +438,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - ASSERT(pTopic); taosRLockLatch(&pTopic->lock); tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); topicEp.schema.nCols = pTopic->schema.nCols; @@ -775,8 +774,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, taosWLockLatch(&pOldConsumer->lock); if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { - ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); - ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); + /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ + /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; @@ -798,8 +797,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; } } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { - ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); - ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); + /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ + /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/ @@ -812,8 +811,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { - ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0); - ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); + /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/ + /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { @@ -830,15 +829,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->rebalanceTime = pNewConsumer->upTime; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { - ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1); - ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0); + /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/ + /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/ char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // not exist in current topic -#if 1 +#if 0 for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) { char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - ASSERT(strcmp(topic, addedTopic) != 0); + A(strcmp(topic, addedTopic) != 0); } #endif @@ -879,15 +878,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, atomic_add_fetch_32(&pOldConsumer->epoch, 1); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { - ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0); - ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1); + /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/ + /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/ char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); // not exist in new topic -#if 1 +#if 0 for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) { char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i); - ASSERT(strcmp(topic, removedTopic) != 0); + A(strcmp(topic, removedTopic) != 0); } #endif @@ -913,7 +912,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } } // must find the topic - ASSERT(i < sz); + /*A(i < sz);*/ // set status if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index af1a29def0..bdef8000bd 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -115,13 +115,11 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream if (pStream->fixedSinkVgId == 0) { SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); - ASSERT(pDb); if (pDb->cfg.numOfVgroups > 1) { isShuffle = true; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { - ASSERT(0); return -1; } } @@ -140,9 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream for (int32_t j = 0; j < sinkLvSize; j++) { SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); if (pLastLevelTask->nodeId == pVgInfo->vgId) { - ASSERT(pVgInfo->vgId > 0); pVgInfo->taskId = pLastLevelTask->taskId; - ASSERT(pVgInfo->taskId != 0); break; } } @@ -152,7 +148,6 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; @@ -170,7 +165,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { - ASSERT(0); terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } @@ -195,7 +189,6 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { - ASSERT(0); terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } @@ -222,8 +215,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); - ASSERT(taosArrayGetSize(pStream->tasks) == 1); - while (1) { SVgObj* pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -257,7 +248,10 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); - ASSERT(pTask->tbSink.pSchemaWrapper); + if (pTask->tbSink.pSchemaWrapper == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } } sdbRelease(pSdb, pVgroup); } @@ -265,7 +259,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { } int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { - ASSERT(pStream->fixedSinkVgId != 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid); if (pTask == NULL) { @@ -275,8 +268,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(tasks, pTask); - ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId); - pTask->nodeId = pStream->fixedSinkVgId; #if 0 SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); @@ -311,13 +302,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); - ASSERT(planTotLevel <= 2); + pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); - ASSERT(pDbObj != NULL); + if (pDbObj == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } bool multiTarget = pDbObj->cfg.numOfVgroups > 1; sdbRelease(pSdb, pDbObj); @@ -351,7 +345,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); + if (plan->subplanType != SUBPLAN_TYPE_MERGE) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } pInnerTask = tNewSStreamTask(pStream->uid); if (pInnerTask == NULL) { @@ -409,7 +406,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + if (plan->subplanType != SUBPLAN_TYPE_SCAN) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } void* pIter = NULL; while (1) { @@ -471,9 +471,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { taosArrayPush(pStream->tasks, &taskOneLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - ASSERT(LIST_LENGTH(inner->pNodeList) == 1); + if (LIST_LENGTH(inner->pNodeList) != 1) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + if (plan->subplanType != SUBPLAN_TYPE_SCAN) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } void* pIter = NULL; while (1) { @@ -550,9 +556,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); } - ASSERT(pSub->unassignedVgs); - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); - void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -590,10 +593,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib sdbRelease(pSdb, pVgroup); } - ASSERT(pSub->unassignedVgs->size > 0); - - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); - qDestroyQueryPlan(pPlan); return 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05c0594339..10ecd56280 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -326,13 +326,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, // deserialize ast if (nodesStringToNode(pObj->ast, &pAst) < 0) { - /*ASSERT(0);*/ goto FAIL; } // extract output schema from ast if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) { - /*ASSERT(0);*/ goto FAIL; } @@ -347,13 +345,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, // using ast and param to build physical plan if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - /*ASSERT(0);*/ goto FAIL; } // save physcial plan if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { - /*ASSERT(0);*/ goto FAIL; } @@ -361,7 +357,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, if (pCreate->numOfTags) { pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); } - ASSERT(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags)); + /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ for (int32_t i = 0; i < pCreate->numOfTags; i++) { SField *pField = taosArrayGet(pCreate->pTags, i); pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; @@ -378,9 +374,6 @@ FAIL: } int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { - if (pTask->taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); - } SEncoder encoder; tEncoderInit(&encoder, NULL, 0); tEncodeSStreamTask(&encoder, pTask); @@ -545,8 +538,6 @@ _OVER: } static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { - ASSERT(pTask->nodeId != 0); - // vnode /*if (pTask->nodeId > 0) {*/ SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); @@ -800,10 +791,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - ASSERT(pTask->nodeId > 0); + /*A(pTask->nodeId > 0);*/ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId); if (pVgObj == NULL) { - ASSERT(0); taosRUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); @@ -863,7 +853,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { - ASSERT(0); terrno = TSDB_CODE_INVALID_MSG; return -1; }