refactor: remove assert
This commit is contained in:
parent
50729f0cc4
commit
27c7e23397
|
@ -133,7 +133,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
||||||
ASSERT(pConsumer);
|
if (pConsumer == NULL) {
|
||||||
|
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
|
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
|
||||||
mndConsumerStatusName(pConsumer->status));
|
mndConsumerStatusName(pConsumer->status));
|
||||||
|
@ -381,8 +384,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0);
|
|
||||||
|
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
|
|
||||||
// 1. check consumer status
|
// 1. check consumer status
|
||||||
|
@ -428,9 +429,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||||
|
|
||||||
// txn guarantees pSub is created
|
// txn guarantees pSub is created
|
||||||
ASSERT(pSub);
|
|
||||||
taosRLockLatch(&pSub->lock);
|
taosRLockLatch(&pSub->lock);
|
||||||
|
|
||||||
SMqSubTopicEp topicEp = {0};
|
SMqSubTopicEp topicEp = {0};
|
||||||
|
@ -438,7 +438,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
// 2.1 fetch topic schema
|
// 2.1 fetch topic schema
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||||
ASSERT(pTopic);
|
|
||||||
taosRLockLatch(&pTopic->lock);
|
taosRLockLatch(&pTopic->lock);
|
||||||
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||||
topicEp.schema.nCols = pTopic->schema.nCols;
|
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||||
|
@ -775,8 +774,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
taosWLockLatch(&pOldConsumer->lock);
|
taosWLockLatch(&pOldConsumer->lock);
|
||||||
|
|
||||||
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
|
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
|
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||||
|
|
||||||
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
|
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||||
|
@ -798,8 +797,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||||
}
|
}
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
|
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||||
/*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
|
/*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
|
||||||
|
@ -812,8 +811,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
|
/*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
|
||||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
@ -830,15 +829,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
|
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
|
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/
|
||||||
|
|
||||||
char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
||||||
// not exist in current topic
|
// not exist in current topic
|
||||||
#if 1
|
#if 0
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
|
||||||
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
|
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
|
||||||
ASSERT(strcmp(topic, addedTopic) != 0);
|
A(strcmp(topic, addedTopic) != 0);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -879,15 +878,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
|
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
|
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
|
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
|
||||||
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
|
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
|
||||||
|
|
||||||
// not exist in new topic
|
// not exist in new topic
|
||||||
#if 1
|
#if 0
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
|
||||||
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
||||||
ASSERT(strcmp(topic, removedTopic) != 0);
|
A(strcmp(topic, removedTopic) != 0);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -913,7 +912,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// must find the topic
|
// must find the topic
|
||||||
ASSERT(i < sz);
|
/*A(i < sz);*/
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||||
|
|
|
@ -115,13 +115,11 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
||||||
|
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
ASSERT(pDb);
|
|
||||||
if (pDb->cfg.numOfVgroups > 1) {
|
if (pDb->cfg.numOfVgroups > 1) {
|
||||||
isShuffle = true;
|
isShuffle = true;
|
||||||
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,9 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
||||||
for (int32_t j = 0; j < sinkLvSize; j++) {
|
for (int32_t j = 0; j < sinkLvSize; j++) {
|
||||||
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
||||||
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
||||||
ASSERT(pVgInfo->vgId > 0);
|
|
||||||
pVgInfo->taskId = pLastLevelTask->taskId;
|
pVgInfo->taskId = pLastLevelTask->taskId;
|
||||||
ASSERT(pVgInfo->taskId != 0);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -152,7 +148,6 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
|
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
|
||||||
// one sink only
|
// one sink only
|
||||||
ASSERT(taosArrayGetSize(pArray) == 1);
|
|
||||||
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
|
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
|
||||||
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
||||||
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
||||||
|
@ -170,7 +165,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co
|
||||||
plan->execNode.epSet = pTask->epSet;
|
plan->execNode.epSet = pTask->epSet;
|
||||||
|
|
||||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -195,7 +189,6 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan,
|
||||||
plan->execNode.epSet = pTask->epSet;
|
plan->execNode.epSet = pTask->epSet;
|
||||||
|
|
||||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -222,8 +215,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
||||||
|
|
||||||
ASSERT(taosArrayGetSize(pStream->tasks) == 1);
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
|
@ -257,7 +248,10 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
if (pTask->tbSink.pSchemaWrapper == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
@ -265,7 +259,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
|
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
ASSERT(pStream->fixedSinkVgId != 0);
|
|
||||||
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
@ -275,8 +268,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
pTask->fillHistory = pStream->fillHistory;
|
pTask->fillHistory = pStream->fillHistory;
|
||||||
mndAddTaskToTaskSet(tasks, pTask);
|
mndAddTaskToTaskSet(tasks, pTask);
|
||||||
|
|
||||||
ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);
|
|
||||||
|
|
||||||
pTask->nodeId = pStream->fixedSinkVgId;
|
pTask->nodeId = pStream->fixedSinkVgId;
|
||||||
#if 0
|
#if 0
|
||||||
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
|
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
|
||||||
|
@ -311,13 +302,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
|
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||||
ASSERT(planTotLevel <= 2);
|
|
||||||
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
|
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
|
||||||
|
|
||||||
bool hasExtraSink = false;
|
bool hasExtraSink = false;
|
||||||
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
||||||
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
ASSERT(pDbObj != NULL);
|
if (pDbObj == NULL) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
|
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
|
||||||
sdbRelease(pSdb, pDbObj);
|
sdbRelease(pSdb, pDbObj);
|
||||||
|
@ -351,7 +345,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
|
|
||||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
|
if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
pInnerTask = tNewSStreamTask(pStream->uid);
|
pInnerTask = tNewSStreamTask(pStream->uid);
|
||||||
if (pInnerTask == NULL) {
|
if (pInnerTask == NULL) {
|
||||||
|
@ -409,7 +406,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
|
|
||||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
|
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
|
||||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
|
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -471,9 +471,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
taosArrayPush(pStream->tasks, &taskOneLevel);
|
taosArrayPush(pStream->tasks, &taskOneLevel);
|
||||||
|
|
||||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||||
ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
|
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
|
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -550,9 +556,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSub->unassignedVgs);
|
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
|
@ -590,10 +593,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSub->unassignedVgs->size > 0);
|
|
||||||
|
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
|
||||||
|
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -326,13 +326,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
|
|
||||||
// deserialize ast
|
// deserialize ast
|
||||||
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
||||||
/*ASSERT(0);*/
|
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// extract output schema from ast
|
// extract output schema from ast
|
||||||
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
|
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
|
||||||
/*ASSERT(0);*/
|
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,13 +345,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
|
|
||||||
// using ast and param to build physical plan
|
// using ast and param to build physical plan
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||||
/*ASSERT(0);*/
|
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// save physcial plan
|
// save physcial plan
|
||||||
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
|
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
|
||||||
/*ASSERT(0);*/
|
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,7 +357,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
if (pCreate->numOfTags) {
|
if (pCreate->numOfTags) {
|
||||||
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
|
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
|
||||||
}
|
}
|
||||||
ASSERT(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));
|
/*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
|
||||||
for (int32_t i = 0; i < pCreate->numOfTags; i++) {
|
for (int32_t i = 0; i < pCreate->numOfTags; i++) {
|
||||||
SField *pField = taosArrayGet(pCreate->pTags, i);
|
SField *pField = taosArrayGet(pCreate->pTags, i);
|
||||||
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
|
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
|
||||||
|
@ -378,9 +374,6 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
|
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
|
||||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
|
||||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
|
||||||
}
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, NULL, 0);
|
tEncoderInit(&encoder, NULL, 0);
|
||||||
tEncodeSStreamTask(&encoder, pTask);
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
@ -545,8 +538,6 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
ASSERT(pTask->nodeId != 0);
|
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
/*if (pTask->nodeId > 0) {*/
|
/*if (pTask->nodeId > 0) {*/
|
||||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||||
|
@ -800,10 +791,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
int32_t sz = taosArrayGetSize(pLevel);
|
int32_t sz = taosArrayGetSize(pLevel);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||||
ASSERT(pTask->nodeId > 0);
|
/*A(pTask->nodeId > 0);*/
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
|
||||||
if (pVgObj == NULL) {
|
if (pVgObj == NULL) {
|
||||||
ASSERT(0);
|
|
||||||
taosRUnLockLatch(&pStream->lock);
|
taosRUnLockLatch(&pStream->lock);
|
||||||
mndReleaseStream(pMnode, pStream);
|
mndReleaseStream(pMnode, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -863,7 +853,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SMDropStreamReq dropReq = {0};
|
SMDropStreamReq dropReq = {0};
|
||||||
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue