fix: scheduler ut issues

This commit is contained in:
dapan1121 2024-10-28 18:44:41 +08:00
parent c31eadea21
commit cea25e7f9d
3 changed files with 22 additions and 3 deletions

View File

@ -371,6 +371,8 @@ int32_t qptGetColumnRandLen(int32_t colType) {
assert(0); assert(0);
break; break;
} }
return 0;
} }

View File

@ -334,7 +334,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
HASH_NO_LOCK); HASH_NO_LOCK);
if (NULL == planToTask) { if (NULL == planToTask) {
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
SCH_ERR_RET(terrno); SCH_ERR_JRET(terrno);
} }
pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
@ -363,7 +363,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pLevel = taosArrayGet(pJob->levels, i); pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) { if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum); SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
pLevel->level = i; pLevel->level = i;

View File

@ -203,6 +203,10 @@ void schtBuildQueryDag(SQueryPlan *dag) {
return; return;
} }
scanPlan->msgType = TDMT_SCH_QUERY; scanPlan->msgType = TDMT_SCH_QUERY;
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&scanPlan->pDataSink);
if (NULL == scanPlan->pDataSink) {
return;
}
mergePlan->id.queryId = qId; mergePlan->id.queryId = qId;
mergePlan->id.groupId = schtMergeTemplateId; mergePlan->id.groupId = schtMergeTemplateId;
@ -223,6 +227,10 @@ void schtBuildQueryDag(SQueryPlan *dag) {
return; return;
} }
mergePlan->msgType = TDMT_SCH_QUERY; mergePlan->msgType = TDMT_SCH_QUERY;
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&mergePlan->pDataSink);
if (NULL == mergePlan->pDataSink) {
return;
}
merge->pNodeList = NULL; merge->pNodeList = NULL;
code = nodesMakeList(&merge->pNodeList); code = nodesMakeList(&merge->pNodeList);
@ -235,6 +243,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
return; return;
} }
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan); (void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan); (void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
@ -250,7 +259,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
int32_t scanPlanNum = 20; int32_t scanPlanNum = 20;
dag->queryId = qId; dag->queryId = qId;
dag->numOfSubplans = 2; dag->numOfSubplans = scanPlanNum + 1;
dag->pSubplans = NULL; dag->pSubplans = NULL;
int32_t code = nodesMakeList(&dag->pSubplans); int32_t code = nodesMakeList(&dag->pSubplans);
if (NULL == dag->pSubplans) { if (NULL == dag->pSubplans) {
@ -289,6 +298,10 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
if (NULL == mergePlan->pChildren) { if (NULL == mergePlan->pChildren) {
return; return;
} }
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&mergePlan->pDataSink);
if (NULL == mergePlan->pDataSink) {
return;
}
for (int32_t i = 0; i < scanPlanNum; ++i) { for (int32_t i = 0; i < scanPlanNum; ++i) {
SSubplan *scanPlan = NULL; SSubplan *scanPlan = NULL;
@ -322,6 +335,10 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
return; return;
} }
scanPlan->msgType = TDMT_SCH_QUERY; scanPlan->msgType = TDMT_SCH_QUERY;
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&scanPlan->pDataSink);
if (NULL == scanPlan->pDataSink) {
return;
}
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); (void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); (void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);