From 7791d68d48d139df3f7c8ccef69928b096f86cf3 Mon Sep 17 00:00:00 2001 From: dapan Date: Sun, 9 Jan 2022 18:08:04 +0800 Subject: [PATCH 1/2] feature/qnode --- source/libs/parser/src/astValidate.c | 3 ++- source/libs/planner/src/physicalPlan.c | 1 + source/libs/scheduler/src/scheduler.c | 4 +++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 5cabbb5e3b..faa8c526a0 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -3952,7 +3952,8 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt; pQueryInfo->pTableMetaInfo[0]->name = *name; pQueryInfo->numOfTables = 1; - + pQueryInfo->pTableMetaInfo[0]->tagColList = taosArrayInit(4, POINTER_BYTES); + code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(data.pTableMeta); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index bbb84223ac..47f86cdab1 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -237,6 +237,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); + subplan->msgType = TDMT_VND_QUERY; vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode); subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3f8c75a78c..cbb1218bb4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -23,6 +23,7 @@ static SSchedulerMgmt schMgmt = {0}; int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { int32_t code = 0; +/* if (oriStatus == newStatus) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -77,6 +78,7 @@ int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { qError("invalid task status:%d", oriStatus); return TSDB_CODE_QRY_APP_ERROR; } +*/ return TSDB_CODE_SUCCESS; @@ -539,7 +541,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ELOG("task failed[%x], no more retry", errCode); + SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); From 180eba5b23a0c8ad24bb02593c89e2ccd8f49b8d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 10 Jan 2022 09:32:10 +0800 Subject: [PATCH 2/2] feature/qnode --- source/libs/planner/src/physicalPlan.c | 1 + source/libs/scheduler/src/scheduler.c | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 47f86cdab1..547e3cc53c 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -362,4 +362,5 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { //todo + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index cbb1218bb4..25b8ec981e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -463,6 +463,7 @@ int32_t schProcessOnDataFetched(SSchJob *job) { int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; + int32_t code = 0; SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved)); if (!moved) { @@ -524,7 +525,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&par->childReady, 1); - SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr)); + code = qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr); + if (code) { + SCH_TASK_ELOG("qSetSubplanExecutionNode failed, code:%x, templateId:%"PRIx64, code, pTask->plan->id.templateId); + SCH_ERR_RET(code); + } if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schLaunchTask(pJob, par));