From 1dba25656af04ef9597f1e6f6c09cb5b04f86cda Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 4 Jan 2022 11:15:00 +0800 Subject: [PATCH] feature/qnode --- include/libs/planner/planner.h | 2 +- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 2 +- source/libs/planner/src/planner.c | 2 +- source/libs/scheduler/src/scheduler.c | 4 +-- source/libs/scheduler/test/schedulerTests.cpp | 29 +++++++++++-------- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index f1a9447e9c..81a6d59e8d 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -147,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan // @ep one execution location of this group of datasource subplans -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 35c6d59ffe..61d622155f 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -101,7 +101,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 22a1beaa35..715090d4b3 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -336,6 +336,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { //todo } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 6381771400..96556ee3ff 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -46,7 +46,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) return TSDB_CODE_SUCCESS; } -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { return setSubplanExecutionNode(subplan, templateId, ep); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 090c7b6fa7..68ccb16fc3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -404,8 +404,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } } else { - strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); - job->resEp.port = task->execAddr.port; + strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn)); + job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port; } job->fetchTask = task; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index b418ade172..2c8eafb825 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -54,10 +54,11 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan.id.templateId = 0x0000000000000002; scanPlan.id.subplanId = 0x0000000000000003; scanPlan.type = QUERY_TYPE_SCAN; - scanPlan.level = 1; - scanPlan.execEpSet.numOfEps = 1; - scanPlan.execEpSet.port[0] = 6030; - strcpy(scanPlan.execEpSet.fqdn[0], "ep0"); + scanPlan.execNode.numOfEps = 1; + scanPlan.execNode.nodeId = 1; + scanPlan.execNode.inUse = 0; + scanPlan.execNode.epAddr[0].port = 6030; + strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0"); scanPlan.pChildern = NULL; scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); @@ -67,7 +68,7 @@ void schtBuildQueryDag(SQueryDag *dag) { mergePlan.id.subplanId = 0x5555555555; mergePlan.type = QUERY_TYPE_MERGE; mergePlan.level = 0; - mergePlan.execEpSet.numOfEps = 0; + mergePlan.execNode.numOfEps = 0; mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); mergePlan.pParents = NULL; mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); @@ -97,9 +98,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[0].id.subplanId = 0x0000000000000004; insertPlan[0].type = QUERY_TYPE_MODIFY; insertPlan[0].level = 0; - insertPlan[0].execEpSet.numOfEps = 1; - insertPlan[0].execEpSet.port[0] = 6030; - strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0"); + insertPlan[0].execNode.numOfEps = 1; + insertPlan[0].execNode.nodeId = 1; + insertPlan[0].execNode.inUse = 0; + insertPlan[0].execNode.epAddr[0].port = 6030; + strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0"); insertPlan[0].pChildern = NULL; insertPlan[0].pParents = NULL; insertPlan[0].pNode = NULL; @@ -110,9 +113,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[1].id.subplanId = 0x0000000000000005; insertPlan[1].type = QUERY_TYPE_MODIFY; insertPlan[1].level = 0; - insertPlan[1].execEpSet.numOfEps = 1; - insertPlan[1].execEpSet.port[0] = 6030; - strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1"); + insertPlan[1].execNode.numOfEps = 1; + insertPlan[1].execNode.nodeId = 1; + insertPlan[1].execNode.inUse = 1; + insertPlan[1].execNode.epAddr[0].port = 6030; + strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1"); insertPlan[1].pChildern = NULL; insertPlan[1].pParents = NULL; insertPlan[1].pNode = NULL; @@ -132,7 +137,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { return 0; } -int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { +int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { return 0; }