TD-13747 merge 3.0
This commit is contained in:
parent
6cc87452cf
commit
d05fccca47
|
@ -32,24 +32,24 @@
|
||||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SQueryDag* pDag = qStringToDag(pTopic->physicalPlan);
|
SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||||
SArray* pAray = NULL;
|
SArray* pAray = NULL;
|
||||||
SArray* unassignedVg = pSub->unassignedVg;
|
SArray* unassignedVg = pSub->unassignedVg;
|
||||||
|
|
||||||
ASSERT(pSub->vgNum == 0);
|
ASSERT(pSub->vgNum == 0);
|
||||||
|
|
||||||
int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
|
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||||
if (levelNum != 1) {
|
if (levelNum != 1) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
|
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
|
||||||
|
|
||||||
int32_t opNum = taosArrayGetSize(inner);
|
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
||||||
if (opNum != 1) {
|
if (opNum != 1) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SSubplan* plan = taosArrayGetP(inner, 0);
|
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
Loading…
Reference in New Issue