feature/qnode

This commit is contained in:
dapan1121 2022-01-04 11:15:00 +08:00
parent d74b91afb9
commit 1dba25656a
6 changed files with 23 additions and 18 deletions

View File

@ -147,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);

View File

@ -101,7 +101,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag);
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan);

View File

@ -336,6 +336,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return TSDB_CODE_SUCCESS;
}
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
//todo
}

View File

@ -46,7 +46,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
return TSDB_CODE_SUCCESS;
}
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return setSubplanExecutionNode(subplan, templateId, ep);
}

View File

@ -404,8 +404,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS;
}
} else {
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
job->resEp.port = task->execAddr.port;
strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn));
job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port;
}
job->fetchTask = task;

View File

@ -54,10 +54,11 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan.id.templateId = 0x0000000000000002;
scanPlan.id.subplanId = 0x0000000000000003;
scanPlan.type = QUERY_TYPE_SCAN;
scanPlan.level = 1;
scanPlan.execEpSet.numOfEps = 1;
scanPlan.execEpSet.port[0] = 6030;
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
scanPlan.execNode.numOfEps = 1;
scanPlan.execNode.nodeId = 1;
scanPlan.execNode.inUse = 0;
scanPlan.execNode.epAddr[0].port = 6030;
strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0");
scanPlan.pChildern = NULL;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
@ -67,7 +68,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan.id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execEpSet.numOfEps = 0;
mergePlan.execNode.numOfEps = 0;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
@ -97,9 +98,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].id.subplanId = 0x0000000000000004;
insertPlan[0].type = QUERY_TYPE_MODIFY;
insertPlan[0].level = 0;
insertPlan[0].execEpSet.numOfEps = 1;
insertPlan[0].execEpSet.port[0] = 6030;
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
insertPlan[0].execNode.numOfEps = 1;
insertPlan[0].execNode.nodeId = 1;
insertPlan[0].execNode.inUse = 0;
insertPlan[0].execNode.epAddr[0].port = 6030;
strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
insertPlan[0].pChildern = NULL;
insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL;
@ -110,9 +113,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].id.subplanId = 0x0000000000000005;
insertPlan[1].type = QUERY_TYPE_MODIFY;
insertPlan[1].level = 0;
insertPlan[1].execEpSet.numOfEps = 1;
insertPlan[1].execEpSet.port[0] = 6030;
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
insertPlan[1].execNode.numOfEps = 1;
insertPlan[1].execNode.nodeId = 1;
insertPlan[1].execNode.inUse = 1;
insertPlan[1].execNode.epAddr[0].port = 6030;
strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
insertPlan[1].pChildern = NULL;
insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL;
@ -132,7 +137,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
return 0;
}
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return 0;
}