From 5bc490d2aeb67ffdbbad5d668f9b8dc07c9cb461 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 31 Dec 2021 04:30:37 -0500 Subject: [PATCH 1/6] TD-12506 SSubplan struct modify --- include/libs/planner/planner.h | 3 ++- include/libs/qcom/query.h | 7 +++++++ include/libs/scheduler/scheduler.h | 7 ------- source/libs/planner/src/physicalPlan.c | 24 ++++++++++++------------ 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 3ca923b0aa..f1a9447e9c 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "query.h" #include "tmsg.h" #include "tarray.h" @@ -122,7 +123,7 @@ typedef struct SSubplan { SSubplanId id; // unique id of the subplan int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY int32_t level; // the execution level of current subplan, starting from 0. - SEpSet execEpSet; // for the scan/modify subplan, the optional execution node + SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node SArray *pChildern; // the datasource subplan,from which to fetch the result SArray *pParents; // the data destination subplan, get data from current subplan SPhyNode *pNode; // physical plan of current subplan diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4d5b1a8bd3..4bfb774435 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -111,6 +111,13 @@ typedef struct SMsgSendInfo { SDataBuf msgInfo; } SMsgSendInfo; +typedef struct SQueryNodeAddr{ + int32_t nodeId; //vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); int32_t initTaskQueue(); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index d6cac976d4..b2ba7acebf 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -50,13 +50,6 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct SQueryNodeAddr{ - int32_t nodeId; //vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - typedef struct SQueryResult { int32_t code; uint64_t numOfRows; diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 97c9cec7c7..22a1beaa35 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -211,22 +211,22 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { return subplan; } -static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) { - epSet->inUse = 0; // todo - epSet->numOfEps = vg->numOfEps; +static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) { + execNode->nodeId = vg->vgId; + execNode->inUse = 0; // todo + execNode->numOfEps = vg->numOfEps; for (int8_t i = 0; i < vg->numOfEps; ++i) { - epSet->port[i] = vg->epAddr[i].port; - strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn); + execNode->epAddr[i] = vg->epAddr[i]; } return; } -static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { - epSet->inUse = 0; // todo - epSet->numOfEps = vg->numOfEps; +static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { + execNode->nodeId = vg->vgId; + execNode->inUse = 0; // todo + execNode->numOfEps = vg->numOfEps; for (int8_t i = 0; i < vg->numOfEps; ++i) { - epSet->port[i] = vg->epAddr[i].port; - strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn); + execNode->epAddr[i] = vg->epAddr[i]; } return; } @@ -236,7 +236,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); - vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); + vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode); subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode); RECOVERY_CURRENT_SUBPLAN(pCxt); @@ -297,7 +297,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i); - vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet); + vgroupInfoToEpSet(&blocks->vg, &subplan->execNode); subplan->pNode = NULL; subplan->pDataSink = createDataInserter(pCxt, blocks); subplan->type = QUERY_TYPE_MODIFY; From d74b91afb94776f90db99feff8cb13d9f347a7f7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 31 Dec 2021 19:31:35 +0800 Subject: [PATCH 2/6] feature/qnode --- include/common/tmsg.h | 3 + source/libs/scheduler/inc/schedulerInt.h | 24 +++---- source/libs/scheduler/src/scheduler.c | 79 ++++++++++++++++++------ 3 files changed, 77 insertions(+), 29 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 10aba94656..7f3a73f96e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1090,6 +1090,7 @@ typedef struct SResFetchMsg { } SResFetchMsg; typedef struct SSchTasksStatusMsg { + SMsgHead header; uint64_t sId; } SSchTasksStatusMsg; @@ -1105,6 +1106,7 @@ typedef struct SSchedulerStatusRsp { } SSchedulerStatusRsp; typedef struct STaskCancelMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; @@ -1115,6 +1117,7 @@ typedef struct STaskCancelRsp { } STaskCancelRsp; typedef struct STaskDropMsg { + SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2770f7e21a..fa4ae0d152 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -60,17 +60,19 @@ typedef struct SSchLevel { typedef struct SSchTask { - uint64_t taskId; // task id - SSchLevel *level; // level - SSubplan *plan; // subplan - char *msg; // operator tree - int32_t msgLen; // msg length - int8_t status; // task status - SEpAddr execAddr; // task actual executed node address - SQueryProfileSummary summary; // task execution summary - int32_t childReady; // child task ready number - SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* - SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + uint64_t taskId; // task id + SSchLevel *level; // level + SSubplan *plan; // subplan + char *msg; // operator tree + int32_t msgLen; // msg length + int8_t status; // task status + SQueryNodeAddr execAddr; // task actual executed node address + int8_t condidateIdx; // current try condidation index + SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr + SQueryProfileSummary summary; // task execution summary + int32_t childReady; // child task ready number + SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SSchTask; typedef struct SSchJobAttr { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 20eb94c2ff..090c7b6fa7 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -215,28 +215,49 @@ _return: SCH_RET(code); } -int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { - if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { +int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) { + if (task->condidateAddrs) { return TSDB_CODE_SUCCESS; } - int32_t nodeNum = taosArrayGetSize(job->nodeList); - - for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { - SEpAddr *addr = taosArrayGet(job->nodeList, i); - - strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); - epSet->port[epSet->numOfEps] = addr->port; - - ++epSet->numOfEps; + task->condidateIdx = 0; + task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + if (NULL == task->condidateAddrs) { + qError("taosArrayInit failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) { + if (task->plan->execNode.numOfEps > 0) { + if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + } + + int32_t addNum = 0; + int32_t nodeNum = taosArrayGetSize(job->nodeList); + + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); + + if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ++addNum; + } + +/* + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; ++epSet->numOfEps; } +*/ return TSDB_CODE_SUCCESS; } @@ -394,12 +415,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } +/* if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; ++job->dataSrcEps.numOfEps; } +*/ for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); @@ -633,6 +656,16 @@ _return: SCH_RET(code); } +void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { + epSet->inUse = addr->inUse; + epSet->numOfEps = addr->numOfEps; + + for (int8_t i = 0; i < epSet->numOfEps; ++i) { + strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn)); + epSet->port[i] = addr->epAddr[i].port; + } +} + int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { uint32_t msgSize = 0; @@ -665,6 +698,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SSubQueryMsg *pMsg = msg; + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -681,6 +715,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SResReadyMsg *pMsg = msg; + + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -698,6 +734,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SResFetchMsg *pMsg = msg; + + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -712,6 +750,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } STaskDropMsg *pMsg = msg; + + pMsg->header.vgId = htonl(task->plan->execNode.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); @@ -723,7 +763,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { break; } - SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize)); + SEpSet epSet; + SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx); + + schConvertAddrToEpSet(addr, &epSet); + + SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize)); return TSDB_CODE_SUCCESS; @@ -737,12 +782,10 @@ _return: int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); - if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); - } + SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); - if (plan->execEpSet.numOfEps <= 0) { - SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); + if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { + SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } From 1dba25656af04ef9597f1e6f6c09cb5b04f86cda Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 4 Jan 2022 11:15:00 +0800 Subject: [PATCH 3/6] feature/qnode --- include/libs/planner/planner.h | 2 +- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 2 +- source/libs/planner/src/planner.c | 2 +- source/libs/scheduler/src/scheduler.c | 4 +-- source/libs/scheduler/test/schedulerTests.cpp | 29 +++++++++++-------- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index f1a9447e9c..81a6d59e8d 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -147,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan // @ep one execution location of this group of datasource subplans -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 35c6d59ffe..61d622155f 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -101,7 +101,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 22a1beaa35..715090d4b3 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -336,6 +336,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { //todo } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 6381771400..96556ee3ff 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -46,7 +46,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) return TSDB_CODE_SUCCESS; } -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { return setSubplanExecutionNode(subplan, templateId, ep); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 090c7b6fa7..68ccb16fc3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -404,8 +404,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } } else { - strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); - job->resEp.port = task->execAddr.port; + strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn)); + job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port; } job->fetchTask = task; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index b418ade172..2c8eafb825 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -54,10 +54,11 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan.id.templateId = 0x0000000000000002; scanPlan.id.subplanId = 0x0000000000000003; scanPlan.type = QUERY_TYPE_SCAN; - scanPlan.level = 1; - scanPlan.execEpSet.numOfEps = 1; - scanPlan.execEpSet.port[0] = 6030; - strcpy(scanPlan.execEpSet.fqdn[0], "ep0"); + scanPlan.execNode.numOfEps = 1; + scanPlan.execNode.nodeId = 1; + scanPlan.execNode.inUse = 0; + scanPlan.execNode.epAddr[0].port = 6030; + strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0"); scanPlan.pChildern = NULL; scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); @@ -67,7 +68,7 @@ void schtBuildQueryDag(SQueryDag *dag) { mergePlan.id.subplanId = 0x5555555555; mergePlan.type = QUERY_TYPE_MERGE; mergePlan.level = 0; - mergePlan.execEpSet.numOfEps = 0; + mergePlan.execNode.numOfEps = 0; mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); mergePlan.pParents = NULL; mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); @@ -97,9 +98,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[0].id.subplanId = 0x0000000000000004; insertPlan[0].type = QUERY_TYPE_MODIFY; insertPlan[0].level = 0; - insertPlan[0].execEpSet.numOfEps = 1; - insertPlan[0].execEpSet.port[0] = 6030; - strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0"); + insertPlan[0].execNode.numOfEps = 1; + insertPlan[0].execNode.nodeId = 1; + insertPlan[0].execNode.inUse = 0; + insertPlan[0].execNode.epAddr[0].port = 6030; + strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0"); insertPlan[0].pChildern = NULL; insertPlan[0].pParents = NULL; insertPlan[0].pNode = NULL; @@ -110,9 +113,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[1].id.subplanId = 0x0000000000000005; insertPlan[1].type = QUERY_TYPE_MODIFY; insertPlan[1].level = 0; - insertPlan[1].execEpSet.numOfEps = 1; - insertPlan[1].execEpSet.port[0] = 6030; - strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1"); + insertPlan[1].execNode.numOfEps = 1; + insertPlan[1].execNode.nodeId = 1; + insertPlan[1].execNode.inUse = 1; + insertPlan[1].execNode.epAddr[0].port = 6030; + strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1"); insertPlan[1].pChildern = NULL; insertPlan[1].pParents = NULL; insertPlan[1].pNode = NULL; @@ -132,7 +137,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { return 0; } -int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { +int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { return 0; } From 04d5badb3ce4f0442d3f92dfdfe65a145cdf4b09 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 4 Jan 2022 03:46:55 +0000 Subject: [PATCH 4/6] add forward compatilibity --- source/util/src/encode.c | 3 +-- source/util/test/encodeTest.cpp | 46 +++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/source/util/src/encode.c b/source/util/src/encode.c index 40c03ea051..096509d42a 100644 --- a/source/util/src/encode.c +++ b/source/util/src/encode.c @@ -127,15 +127,14 @@ void tEndDecode(SCoder* pCoder) { struct SCoderNode* pNode; ASSERT(pCoder->type == TD_DECODER); - ASSERT(tDecodeIsEnd(pCoder)); pNode = TD_SLIST_HEAD(&(pCoder->stack)); ASSERT(pNode); TD_SLIST_POP(&(pCoder->stack)); pCoder->data = pNode->data; + pCoder->pos = pCoder->size + pNode->pos; pCoder->size = pNode->size; - pCoder->pos = pCoder->pos + pNode->pos; free(pNode); } diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 1a861701d7..b178ee0b10 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -360,8 +360,8 @@ TEST(td_encode_test, compound_struct_encode_test) { SStructA_v2 sa2 = {.A_a = 10, .A_b = 65478, .A_c = "Hello", .A_d = 67, .A_e = 13}; SFinalReq_v1 req1 = {.pA = &sa1, .v_a = 15, .v_b = 35}; SFinalReq_v2 req2 = {.pA = &sa2, .v_a = 15, .v_b = 32, .v_c = 37}; - SFinalReq_v1 dreq1; - SFinalReq_v2 dreq21, dreq22; + SFinalReq_v1 dreq11, dreq21; + SFinalReq_v2 dreq12, dreq22; // Get size tCoderInit(&encoder, TD_LITTLE_ENDIAN, nullptr, 0, TD_ENCODER); @@ -386,27 +386,30 @@ TEST(td_encode_test, compound_struct_encode_test) { tCoderClear(&encoder); // Decode + // buf1 -> req1 tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER); - GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq1), 0); - GTEST_ASSERT_EQ(dreq1.pA->A_a, req1.pA->A_a); - GTEST_ASSERT_EQ(dreq1.pA->A_b, req1.pA->A_b); - GTEST_ASSERT_EQ(strcmp(dreq1.pA->A_c, req1.pA->A_c), 0); - GTEST_ASSERT_EQ(dreq1.v_a, req1.v_a); - GTEST_ASSERT_EQ(dreq1.v_b, req1.v_b); + GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq11), 0); + GTEST_ASSERT_EQ(dreq11.pA->A_a, req1.pA->A_a); + GTEST_ASSERT_EQ(dreq11.pA->A_b, req1.pA->A_b); + GTEST_ASSERT_EQ(strcmp(dreq11.pA->A_c, req1.pA->A_c), 0); + GTEST_ASSERT_EQ(dreq11.v_a, req1.v_a); + GTEST_ASSERT_EQ(dreq11.v_b, req1.v_b); tCoderClear(&decoder); + // buf1 -> req2 (backward compatibility) tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER); - GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq21), 0); - GTEST_ASSERT_EQ(dreq21.pA->A_a, req1.pA->A_a); - GTEST_ASSERT_EQ(dreq21.pA->A_b, req1.pA->A_b); - GTEST_ASSERT_EQ(strcmp(dreq21.pA->A_c, req1.pA->A_c), 0); - GTEST_ASSERT_EQ(dreq21.pA->A_d, 0); - GTEST_ASSERT_EQ(dreq21.pA->A_e, 0); - GTEST_ASSERT_EQ(dreq21.v_a, req1.v_a); - GTEST_ASSERT_EQ(dreq21.v_b, req1.v_b); - GTEST_ASSERT_EQ(dreq21.v_c, 0); + GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq12), 0); + GTEST_ASSERT_EQ(dreq12.pA->A_a, req1.pA->A_a); + GTEST_ASSERT_EQ(dreq12.pA->A_b, req1.pA->A_b); + GTEST_ASSERT_EQ(strcmp(dreq12.pA->A_c, req1.pA->A_c), 0); + GTEST_ASSERT_EQ(dreq12.pA->A_d, 0); + GTEST_ASSERT_EQ(dreq12.pA->A_e, 0); + GTEST_ASSERT_EQ(dreq12.v_a, req1.v_a); + GTEST_ASSERT_EQ(dreq12.v_b, req1.v_b); + GTEST_ASSERT_EQ(dreq12.v_c, 0); tCoderClear(&decoder); + // buf2 -> req2 tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_DECODER); GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq22), 0); GTEST_ASSERT_EQ(dreq22.pA->A_a, req2.pA->A_a); @@ -418,4 +421,13 @@ TEST(td_encode_test, compound_struct_encode_test) { GTEST_ASSERT_EQ(dreq22.v_b, req2.v_b); GTEST_ASSERT_EQ(dreq22.v_c, req2.v_c); tCoderClear(&decoder); + + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_DECODER); + GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq21), 0); + GTEST_ASSERT_EQ(dreq21.pA->A_a, req2.pA->A_a); + GTEST_ASSERT_EQ(dreq21.pA->A_b, req2.pA->A_b); + GTEST_ASSERT_EQ(strcmp(dreq21.pA->A_c, req2.pA->A_c), 0); + GTEST_ASSERT_EQ(dreq21.v_a, req2.v_a); + GTEST_ASSERT_EQ(dreq21.v_b, req2.v_b); + tCoderClear(&decoder); } \ No newline at end of file From f548a08401c37bbb5a0d9ed4178de8da31f8da38 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 4 Jan 2022 03:55:41 +0000 Subject: [PATCH 5/6] fix big endian bug --- source/util/src/encode.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/source/util/src/encode.c b/source/util/src/encode.c index 096509d42a..758d3f442d 100644 --- a/source/util/src/encode.c +++ b/source/util/src/encode.c @@ -88,13 +88,9 @@ void tEndEncode(SCoder* pCoder) { pCoder->size = pNode->size; pCoder->pos = pNode->pos; - if (TD_RT_ENDIAN() == pCoder->endian) { - tPut(int32_t, pCoder->data + pCoder->pos, len); - } else { - tRPut32(pCoder->data + pCoder->pos, len); - } + tEncodeI32(pCoder, len); - TD_CODER_MOVE_POS(pCoder, len + sizeof(int32_t)); + TD_CODER_MOVE_POS(pCoder, len); free(pNode); } From c5e3fdde863ffa8acc5d4301c8dc591397768d3e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 4 Jan 2022 05:39:43 +0000 Subject: [PATCH 6/6] show tables not show stb --- source/dnode/vnode/meta/src/metaBDBImpl.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 4254ad0acd..b8ad7ab235 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -577,11 +577,16 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { STbCfg tbCfg; void * pBuf; - if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) { - pBuf = value.data; - metaDecodeTbInfo(pBuf, &tbCfg); - return tbCfg.name; - } else { - return NULL; + for (;;) { + if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) { + pBuf = value.data; + metaDecodeTbInfo(pBuf, &tbCfg); + if (tbCfg.type == META_SUPER_TABLE) { + continue; + } + return tbCfg.name; + } else { + return NULL; + } } } \ No newline at end of file