From 014471530fbb573128f11f9bc7c362b0585c5a5b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sun, 9 Jan 2022 21:31:23 -0500 Subject: [PATCH] TD-12678 qSetSubplanExecutionNode interface implement --- include/libs/planner/planner.h | 2 +- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 87 +++++++++++++++++++------- source/libs/planner/src/planner.c | 4 +- source/libs/scheduler/src/scheduler.c | 2 +- 5 files changed, 68 insertions(+), 29 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 9e3cfbb0b0..9a7ffa8c18 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -155,7 +155,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan // @ep one execution location of this group of datasource subplans -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); +void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 63d2e9b855..2a50752c88 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); +void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index bbb84223ac..48953a35a7 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -75,24 +75,16 @@ int32_t dsinkNameToDsinkType(const char* name) { return DSINK_Unknown; } -static SDataSink* initDataSink(int32_t type, int32_t size) { - SDataSink* sink = (SDataSink*)validPointer(calloc(1, size)); - sink->info.type = type; - sink->info.name = dsinkTypeToDsinkName(type); - return sink; -} - -static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { - SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher)); - return (SDataSink*)dispatcher; -} - -static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) { - SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter)); - inserter->numOfTables = pBlocks->numOfTables; - inserter->size = pBlocks->size; - SWAP(inserter->pData, pBlocks->pData, char*); - return (SDataSink*)inserter; +static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) { + dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols); + if (NULL == dst->pSchema) { + return false; + } + memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols); + dst->numOfCols = src->numOfCols; + dst->resultRowSize = src->resultRowSize; + dst->precision = src->precision; + return true; } static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { @@ -102,6 +94,10 @@ static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataB return false; } memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols); + dataBlockSchema->resultRowSize = 0; + for (int32_t i = 0; i < dataBlockSchema->numOfCols; ++i) { + dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes; + } return true; } @@ -120,13 +116,37 @@ static bool cloneExprArray(SArray** dst, SArray* src) { return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false); } +static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) { + SDataSink* sink = (SDataSink*)validPointer(calloc(1, size)); + sink->info.type = type; + sink->info.name = dsinkTypeToDsinkName(type); + if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) { + tfree(sink); + THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + return sink; +} + +static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) { + SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot); + inserter->numOfTables = pBlocks->numOfTables; + inserter->size = pBlocks->size; + SWAP(inserter->pData, pBlocks->pData, char*); + return (SDataSink*)inserter; +} + +static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) { + SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot); + return (SDataSink*)dispatcher; +} + static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size)); node->info.type = type; node->info.name = opTypeToOpName(type); if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { free(node); - return NULL; + THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); } return node; } @@ -239,7 +259,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode); subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); - subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode); + subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode); RECOVERY_CURRENT_SUBPLAN(pCxt); } return pCxt->nextId.templateId++; @@ -248,6 +268,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) { SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode)); node->srcTemplateId = srcTemplateId; + node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr))); return (SPhyNode*)node; } @@ -313,7 +334,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i); vgroupInfoToEpSet(&blocks->vg, &subplan->execNode); - subplan->pDataSink = createDataInserter(pCxt, blocks); + subplan->pDataSink = createDataInserter(pCxt, blocks, NULL); subplan->pNode = NULL; subplan->type = QUERY_TYPE_MODIFY; subplan->msgType = pPayload->msgType; @@ -332,7 +353,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { subplan->msgType = TDMT_VND_QUERY; subplan->pNode = createPhyNode(pCxt, pRoot); - subplan->pDataSink = createDataDispatcher(pCxt, pRoot); + subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode); } // todo deal subquery } @@ -359,6 +380,24 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { - //todo +void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) { + if (NULL == pNode) { + return; + } + if (OP_Exchange == pNode->info.type) { + SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode; + if (templateId == pExchange->srcTemplateId) { + taosArrayPush(pExchange->pSrcEndPoints, pEp); + } + } + if (pNode->pChildren != NULL) { + size_t size = taosArrayGetSize(pNode->pChildren); + for(int32_t i = 0; i < size; ++i) { + setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i)); + } + } +} + +void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) { + setExchangSourceNode(templateId, pEp, subplan->pNode); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 54bddd0e3f..f80a631413 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -88,8 +88,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, return TSDB_CODE_SUCCESS; } -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { - return setSubplanExecutionNode(subplan, templateId, ep); +void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { + setSubplanExecutionNode(subplan, templateId, ep); } int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3f8c75a78c..93e8aa7178 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -522,7 +522,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&par->childReady, 1); - SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr)); + qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr); if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schLaunchTask(pJob, par));