From 4e5df8e3c3369dce2e7cf33688d0fedc1dec6d64 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 25 Mar 2022 05:15:45 -0400 Subject: [PATCH 1/5] bugfix --- source/libs/executor/src/executorimpl.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 322 +++++++++++++-------- 2 files changed, 202 insertions(+), 122 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 177d09be76..2db9406418 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5569,7 +5569,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { blockDataCleanup(pInfo->pRes); - int32_t tableNameSlotId = 1; + int32_t tableNameSlotId = 0; SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); char * name = NULL; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index fb9a2fc532..c3ad61dd37 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -41,63 +41,147 @@ static int32_t getSlotKey(SNode* pNode, char* pKey) { return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName); } -static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) { +static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) { SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC); - CHECK_ALLOC(pSlot, NULL); + if (NULL == pSlot) { + return NULL; + } pSlot->slotId = slotId; pSlot->dataType = ((SExprNode*)pNode)->resType; pSlot->reserve = false; - pSlot->output = true; + pSlot->output = output; return (SNode*)pSlot; } -static SNode* createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId) { +static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) { STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET); if (NULL == pTarget) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } + pTarget->dataBlockId = dataBlockId; pTarget->slotId = slotId; pTarget->pExpr = pNode; - return (SNode*)pTarget; + + *pOutput = (SNode*)pTarget; + return TSDB_CODE_SUCCESS; } -static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { - SHashObj* pHash = NULL; - if (NULL == pDataBlockDesc->pSlots) { - pDataBlockDesc->pSlots = nodesMakeList(); - CHECK_ALLOC(pDataBlockDesc->pSlots, TSDB_CODE_OUT_OF_MEMORY); +static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) { + SSlotIndex index = { .dataBlockId = dataBlockId, .slotId = slotId }; + return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex)); +} - pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY); - if (NULL == taosArrayInsert(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId, &pHash)) { - taosHashCleanup(pHash); - return TSDB_CODE_OUT_OF_MEMORY; - } - } else { - pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); +static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) { + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + int32_t len = getSlotKey(pNode, name); + return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash); +} + +static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) { + SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == pHash) { + return TSDB_CODE_OUT_OF_MEMORY; } - - SNode* pNode = NULL; - int16_t slotId = taosHashGetSize(pHash); - FOREACH(pNode, pList) { - CHECK_CODE_EXT(nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId))); - - SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId }; - char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; - int32_t len = getSlotKey(pNode, name); - CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY); - - SNode* pTarget = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId); - CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY); - REPLACE_NODE(pTarget); - - pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; - ++slotId; + if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) { + taosHashCleanup(pHash); + return TSDB_CODE_OUT_OF_MEMORY; } + + *pDescHash = pHash; return TSDB_CODE_SUCCESS; } +static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, SHashObj* pHash) { + pDataBlockDesc->pSlots = nodesMakeList(); + if (NULL == pDataBlockDesc->pSlots) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + int16_t slotId = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true)); + if (TSDB_CODE_SUCCESS == code) { + code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash); + } + if (TSDB_CODE_SUCCESS == code) { + pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; + ++slotId; + } else { + break; + } + } + return code; +} + +static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) { + SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC); + if (NULL == pDesc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pDesc->dataBlockId = pCxt->nextDataBlockId++; + + SHashObj* pHash = NULL; + int32_t code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash); + if (TSDB_CODE_SUCCESS == code) { + code = buildDataBlockSlots(pCxt, pList, pDesc, pHash); + } + + if (TSDB_CODE_SUCCESS == code) { + *pDataBlockDesc = pDesc; + } else { + nodesDestroyNode(pDesc); + } + + return code; +} + +static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, bool output) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); + int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0}; + int32_t len = getSlotKey(pNode, name); + SSlotIndex* pIndex = taosHashGet(pHash, name, len); + if (NULL == pIndex) { + code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, nextSlotId, output)); + if (TSDB_CODE_SUCCESS == code) { + code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash); + } + pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; + slotId = nextSlotId; + ++nextSlotId; + } else { + slotId = pIndex->slotId; + } + + if (TSDB_CODE_SUCCESS == code) { + SNode* pTarget = NULL; + code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget); + if (TSDB_CODE_SUCCESS == code) { + REPLACE_NODE(pTarget); + } + } + + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + +static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, false); +} + +static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, false); +} + typedef struct SSetSlotIdCxt { int32_t errCode; SHashObj* pLeftHash; @@ -114,10 +198,11 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { pIndex = taosHashGet(pCxt->pRightHash, name, len); } // pIndex is definitely not NULL, otherwise it is a bug - CHECK_ALLOC(pIndex, DEAL_RES_ERROR); + if (NULL == pIndex) { + return DEAL_RES_ERROR; + } ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; ((SColumnNode*)pNode)->slotId = pIndex->slotId; - CHECK_ALLOC(pNode, DEAL_RES_ERROR); return DEAL_RES_IGNORE_CHILD; } return DEAL_RES_CONTINUE; @@ -144,7 +229,7 @@ static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i return TSDB_CODE_SUCCESS; } -static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNodeList* pList, SNodeList** pOutput) { +static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, const SNodeList* pList, SNodeList** pOutput) { SNodeList* pRes = nodesCloneList(pList); if (NULL == pRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -164,18 +249,17 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i return TSDB_CODE_SUCCESS; } -static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) { +static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) { SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type); if (NULL == pPhysiNode) { return NULL; } - pPhysiNode->pOutputDataBlockDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC); - if (NULL == pPhysiNode->pOutputDataBlockDesc) { + + int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc); + if (TSDB_CODE_SUCCESS != code) { nodesDestroyNode(pPhysiNode); return NULL; } - pPhysiNode->pOutputDataBlockDesc->dataBlockId = pCxt->nextDataBlockId++; - pPhysiNode->pOutputDataBlockDesc->type = QUERY_NODE_DATABLOCK_DESC; return pPhysiNode; } @@ -186,24 +270,11 @@ static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pL return TSDB_CODE_SUCCESS; } -static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SDataBlockDescNode* pDataBlockDesc) { - SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); - char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; - SNode* pNode; - FOREACH(pNode, pTargets) { - int32_t len = getSlotKey(pNode, name); - SSlotIndex* pIndex = taosHashGet(pHash, name, len); - // pIndex is definitely not NULL, otherwise it is a bug - CHECK_ALLOC(pIndex, TSDB_CODE_FAILED); - ((SSlotDescNode*)nodesListGetNode(pDataBlockDesc->pSlots, pIndex->slotId))->output = true; - } - - return TSDB_CODE_SUCCESS; -} - static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) { SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); - CHECK_ALLOC(pCol, NULL); + if (NULL == pCol) { + return NULL; + } pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; pCol->tableId = tableId; @@ -244,8 +315,12 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanPhysiNode) || QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN == nodeType(pScanPhysiNode)) { pScanPhysiNode->pScanCols = nodesMakeList(); - CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))); + if (NULL == pScanPhysiNode->pScanCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))) { + return TSDB_CODE_OUT_OF_MEMORY; + } SNode* pNode; FOREACH(pNode, pScanCols) { @@ -255,29 +330,29 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys strcpy(pCol->colName, ((SColumnNode*)pNode)->colName); continue; } - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))); + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))) { + return TSDB_CODE_OUT_OF_MEMORY; + } } } else { pScanPhysiNode->pScanCols = nodesCloneList(pScanCols); - CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pScanPhysiNode->pScanCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - // return sortScanCols(pScanPhysiNode->pScanCols); - return TSDB_CODE_SUCCESS; + return sortScanCols(pScanPhysiNode->pScanCols); } static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) { int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols); if (TSDB_CODE_SUCCESS == code) { // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t - code = addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode); } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pScanLogicNode->node.pTargets, pScanPhysiNode->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code) { pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; @@ -302,7 +377,7 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd } static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN); + STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN); if (NULL == pTagScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -310,7 +385,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* p } static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); if (NULL == pTableScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -326,7 +401,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp } static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); + SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -348,7 +423,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* } static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -412,7 +487,7 @@ static int32_t createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode* } static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { - SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN); + SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN); if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -425,14 +500,11 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren code = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc, &pJoin->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, pJoin->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pJoin; @@ -452,7 +524,9 @@ typedef struct SRewritePrecalcExprsCxt { static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) { SNode* pExpr = nodesCloneNode(*pNode); - CHECK_ALLOC(pExpr, DEAL_RES_ERROR); + if (NULL == pExpr) { + return DEAL_RES_ERROR; + } if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) { nodesDestroyNode(pExpr); return DEAL_RES_ERROR; @@ -500,11 +574,15 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN if (NULL == *pPrecalcExprs) { *pPrecalcExprs = nodesMakeList(); - CHECK_ALLOC(*pPrecalcExprs, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == *pPrecalcExprs) { + return TSDB_CODE_OUT_OF_MEMORY; + } } if (NULL == *pRewrittenList) { *pRewrittenList = nodesMakeList(); - CHECK_ALLOC(*pRewrittenList, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == *pRewrittenList) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SNode* pNode = NULL; FOREACH(pNode, pList) { @@ -514,8 +592,12 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN } else { pNew = nodesCloneNode(pNode); } - CHECK_ALLOC(pNew, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE(nodesListAppend(*pRewrittenList, pNew), TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pNew) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SRewritePrecalcExprsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs }; nodesRewriteList(*pRewrittenList, doRewritePrecalcExprs, &cxt); @@ -527,7 +609,7 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN } static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) { - SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_AGG); + SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG); if (NULL == pAgg) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -545,30 +627,27 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pAgg->pExprs, pChildTupe); + code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe); } } if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc); } } if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc); } } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg); } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pAggLogicNode->node.pTargets, pAgg->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pAgg; @@ -576,18 +655,22 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, nodesDestroyNode(pAgg); } + nodesDestroyList(pPrecalcExprs); + nodesDestroyList(pGroupKeys); + nodesDestroyList(pAggFuncs); + return code; } static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) { - SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT); + SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT); if (NULL == pProject) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections, &pProject->pProjections); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject); @@ -603,34 +686,30 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild } static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { - SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; - int32_t code = addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc); + *pPhyNode = (SPhysiNode*)pExchange; - if (TSDB_CODE_SUCCESS == code) { - *pPhyNode = (SPhysiNode*)pExchange; - } else { - nodesDestroyNode(pExchange); - } - - return code; + return TSDB_CODE_SUCCESS; } static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { - SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t code = addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc); + int32_t code = TSDB_CODE_SUCCESS; + + pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets); + if (NULL == pScan->pScanCols) { + code = TSDB_CODE_OUT_OF_MEMORY; + } if (TSDB_CODE_SUCCESS == code) { - pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets); - if (NULL == pScan->pScanCols) { - code = TSDB_CODE_OUT_OF_MEMORY; - } + code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { @@ -660,21 +739,17 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pWindow->pExprs, pChildTupe); + code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe); } } if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) { code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockDesc(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc); + code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc); } } - if (TSDB_CODE_SUCCESS == code) { - code = setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pWindow->node.pOutputDataBlockDesc); - } - if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pWindow; } else { @@ -685,7 +760,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* } static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { - SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL); + SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL); if (NULL == pInterval) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -706,7 +781,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil } static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { - SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW); + SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW); if (NULL == pSession) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -875,17 +950,22 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l SNodeListNode* pGroup; if (level >= LIST_LENGTH(pSubplans)) { pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST); - CHECK_ALLOC(pGroup, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE(nodesListStrictAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pGroup) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) { + return TSDB_CODE_OUT_OF_MEMORY; + } } else { pGroup = nodesListGetNode(pSubplans, level); } if (NULL == pGroup->pNodeList) { pGroup->pNodeList = nodesMakeList(); - CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pGroup->pNodeList) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY); - return TSDB_CODE_SUCCESS; + return nodesListStrictAppend(pGroup->pNodeList, pSubplan); } static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) { From f6bcae191314a419b785de1ad2bcbeaef0c2232b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 26 Mar 2022 01:39:10 -0400 Subject: [PATCH 2/5] sort scan cols --- include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 1 + source/libs/executor/src/executorimpl.c | 13 +++++++---- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/parser/src/parAstCreater.c | 6 +++++ source/libs/parser/src/parTranslater.c | 2 ++ source/libs/planner/src/planLogicCreater.c | 10 +++++---- source/libs/planner/src/planPhysiCreater.c | 26 +++++++++++++++------- source/libs/planner/test/plannerTest.cpp | 10 ++++++++- source/libs/qworker/inc/qworkerInt.h | 1 + source/libs/qworker/src/qworker.c | 2 ++ 11 files changed, 56 insertions(+), 17 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 794e0ca85a..421e9042b7 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -65,6 +65,7 @@ typedef struct SAggLogicNode { typedef struct SProjectLogicNode { SLogicNode node; SNodeList* pProjections; + char stmtName[TSDB_TABLE_NAME_LEN]; } SProjectLogicNode; typedef struct SVnodeModifLogicNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 616e24d67d..5609bb4269 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -231,6 +231,7 @@ typedef struct SSelectStmt { SNodeList* pOrderByList; // SOrderByExprNode SNode* pLimit; SNode* pSlimit; + char stmtName[TSDB_TABLE_NAME_LEN]; } SSelectStmt; typedef enum ESetOperatorType { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fdf216c7eb..a601b9b0b7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5557,7 +5557,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { blockDataCleanup(pInfo->pRes); - int32_t tableNameSlotId = 0; + int32_t tableNameSlotId = 1; SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); char * name = NULL; @@ -8644,9 +8644,14 @@ SArray* extractScanColumnId(SNodeList* pNodeList) { } for(int32_t i = 0; i < numOfCols; ++i) { - STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); - SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; - taosArrayPush(pList, &pColNode->colId); + for (int32_t j = 0; j < numOfCols; ++j) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, j); + if (pNode->slotId == i) { + SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; + taosArrayPush(pList, &pColNode->colId); + break; + } + } } return pList; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 60692323f5..3bc5e5d823 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -250,6 +250,7 @@ static SNode* logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) { static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pProjections); + COPY_CHAR_ARRAY_FIELD(stmtName); return (SNode*)pDst; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 4f27743b2b..5b2f36878b 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -645,6 +645,11 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok tempTable->pSubquery = pSubquery; if (NULL != pTableAlias && TK_NK_NIL != pTableAlias->type) { strncpy(tempTable->table.tableAlias, pTableAlias->z, pTableAlias->n); + } else { + sprintf(tempTable->table.tableAlias, "%p", tempTable); + } + if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) { + strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias); } return (SNode*)tempTable; } @@ -792,6 +797,7 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr select->isDistinct = isDistinct; select->pProjectionList = pProjectionList; select->pFromTable = pTable; + sprintf(select->stmtName, "%p", select); return (SNode*)select; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8c842db161..19c6cb3f27 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1850,6 +1850,7 @@ static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) if (NULL == pSelect) { return TSDB_CODE_OUT_OF_MEMORY; } + sprintf(pSelect->stmtName, "%p", pSelect); SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE); if (NULL == pTable) { @@ -1858,6 +1859,7 @@ static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) } strcpy(pTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB); strcpy(pTable->table.tableName, getSysTableName(showType)); + strcpy(pTable->table.tableAlias, pTable->table.tableName); pSelect->pFromTable = (SNode*)pTable; *pStmt = pSelect; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 84fa52a070..6111597ebc 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -290,13 +290,14 @@ static int32_t createLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSel return code; } -static SColumnNode* createColumnByExpr(SExprNode* pExpr) { +static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr) { SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { return NULL; } pCol->node.resType = pExpr->resType; strcpy(pCol->colName, pExpr->aliasName); + strcpy(pCol->tableAlias, pStmtName); return pCol; } @@ -484,7 +485,7 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele return TSDB_CODE_FAILED; } -static int32_t createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs, SNodeList** pCols) { +static int32_t createColumnByProjections(SLogicPlanContext* pCxt, const char* pStmtName, SNodeList* pExprs, SNodeList** pCols) { SNodeList* pList = nodesMakeList(); if (NULL == pList) { return TSDB_CODE_OUT_OF_MEMORY; @@ -492,7 +493,7 @@ static int32_t createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pEx SNode* pNode; FOREACH(pNode, pExprs) { - if (TSDB_CODE_SUCCESS != nodesListAppend(pList, createColumnByExpr((SExprNode*)pNode))) { + if (TSDB_CODE_SUCCESS != nodesListAppend(pList, createColumnByExpr(pStmtName, (SExprNode*)pNode))) { nodesDestroyList(pList); return TSDB_CODE_OUT_OF_MEMORY; } @@ -514,9 +515,10 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel if (NULL == pProject->pProjections) { code = TSDB_CODE_OUT_OF_MEMORY; } + strcpy(pProject->stmtName, pSelect->stmtName); if (TSDB_CODE_SUCCESS == code) { - code = createColumnByProjections(pCxt,pSelect->pProjectionList, &pProject->node.pTargets); + code = createColumnByProjections(pCxt, pSelect->stmtName, pSelect->pProjectionList, &pProject->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index c3ad61dd37..26f0a0cdaa 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -30,14 +30,20 @@ typedef struct SPhysiPlanContext { SArray* pExecNodeList; } SPhysiPlanContext; -static int32_t getSlotKey(SNode* pNode, char* pKey) { +static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; + if (NULL != pStmtName) { + return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName); + } if ('\0' == pCol->tableAlias[0]) { return sprintf(pKey, "%s", pCol->colName); } return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName); } + if (NULL != pStmtName) { + return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName); + } return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName); } @@ -74,7 +80,7 @@ static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) { char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; - int32_t len = getSlotKey(pNode, name); + int32_t len = getSlotKey(pNode, NULL, name); return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash); } @@ -138,14 +144,14 @@ static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SD return code; } -static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, bool output) { +static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) { int32_t code = TSDB_CODE_SUCCESS; SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0; SNode* pNode = NULL; FOREACH(pNode, pList) { char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0}; - int32_t len = getSlotKey(pNode, name); + int32_t len = getSlotKey(pNode, pStmtName, name); SSlotIndex* pIndex = taosHashGet(pHash, name, len); if (NULL == pIndex) { code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, nextSlotId, output)); @@ -175,11 +181,15 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, } static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { - return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, false); + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false); +} + +static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true); } static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { - return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, false); + return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true); } typedef struct SSetSlotIdCxt { @@ -192,7 +202,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) { SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext; char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; - int32_t len = getSlotKey(pNode, name); + int32_t len = getSlotKey(pNode, NULL, name); SSlotIndex* pIndex = taosHashGet(pCxt->pLeftHash, name, len); if (NULL == pIndex) { pIndex = taosHashGet(pCxt->pRightHash, name, len); @@ -670,7 +680,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections, &pProject->pProjections); if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockSlots(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc); + code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections, pProject->node.pOutputDataBlockDesc); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject); diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 319e539fa8..c318652b76 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -170,7 +170,7 @@ TEST_F(PlannerTest, groupBy) { bind("SELECT count(*) FROM t1"); ASSERT_TRUE(run()); - bind("SELECT c1, count(*) FROM t1 GROUP BY c1"); + bind("SELECT c1, max(c3), min(c2), count(*) FROM t1 GROUP BY c1"); ASSERT_TRUE(run()); bind("SELECT c1 + c3, c1 + count(*) FROM t1 where c2 = 'abc' GROUP BY c1, c3"); @@ -205,6 +205,14 @@ TEST_F(PlannerTest, showTables) { setDatabase("root", "test"); bind("show tables"); + ASSERT_TRUE(run()); +} + +TEST_F(PlannerTest, showStables) { + setDatabase("root", "test"); + + bind("show stables"); + ASSERT_TRUE(run()); } TEST_F(PlannerTest, createTopic) { diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index e768aa2de0..9d0dfb4174 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -178,6 +178,7 @@ typedef struct SQWorkerMgmt { #define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_DLOGL(param, ...) qDebugL("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ae2dccfe39..93ba62170c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -910,6 +910,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { atomic_store_8(&ctx->taskType, taskType); + QW_TASK_DLOGL("task string : %s", qwMsg->msg); + code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code)); From 10f073e79cc58b9ddc112bff8d611f30efcf26b1 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 26 Mar 2022 02:58:44 -0400 Subject: [PATCH 3/5] order by plan implement --- include/libs/nodes/nodes.h | 1 + include/libs/nodes/plannodes.h | 13 +++- source/libs/nodes/src/nodesCloneFuncs.c | 20 ++++++ source/libs/nodes/src/nodesCodeFuncs.c | 71 +++++++++++++++++++++- source/libs/nodes/src/nodesTraverseFuncs.c | 4 +- source/libs/nodes/src/nodesUtilFuncs.c | 6 +- source/libs/planner/inc/planInt.h | 26 -------- source/libs/planner/src/planLogicCreater.c | 56 ++++++++++++++--- source/libs/planner/src/planPhysiCreater.c | 42 +++++++++++++ source/libs/planner/src/planSpliter.c | 4 +- source/libs/planner/test/plannerTest.cpp | 13 ++++ 11 files changed, 215 insertions(+), 41 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 4c83a30bb9..9b8739a4f3 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -120,6 +120,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, QUERY_NODE_LOGIC_PLAN_EXCHANGE, QUERY_NODE_LOGIC_PLAN_WINDOW, + QUERY_NODE_LOGIC_PLAN_SORT, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 421e9042b7..7b0a563907 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -99,6 +99,11 @@ typedef struct SWindowLogicNode { int64_t sessionGap; } SWindowLogicNode; +typedef struct SSortLogicNode { + SLogicNode node; + SNodeList* pSortKeys; +} SSortLogicNode; + typedef enum ESubplanType { SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_PARTIAL, @@ -198,7 +203,7 @@ typedef struct SJoinPhysiNode { typedef struct SAggPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function - SNodeList* pGroupKeys; // SColumnRefNode list + SNodeList* pGroupKeys; SNodeList* pAggFuncs; } SAggPhysiNode; @@ -236,6 +241,12 @@ typedef struct SSessionWinodwPhysiNode { int64_t gap; } SSessionWinodwPhysiNode; +typedef struct SSortPhysiNode { + SPhysiNode node; + SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function + SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode +} SSortPhysiNode; + typedef struct SDataSinkNode { ENodeType type; SDataBlockDescNode* pInputDataBlockDesc; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 3bc5e5d823..161427b4b5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -19,6 +19,11 @@ #include "taos.h" #include "taoserror.h" +#define COPY_ALL_SCALAR_FIELDS \ + do { \ + memcpy((pDst), (pSrc), sizeof(*pSrc)); \ + } while (0) + #define COPY_SCALAR_FIELD(fldname) \ do { \ (pDst)->fldname = (pSrc)->fldname; \ @@ -195,6 +200,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode return (SNode*)pDst; } +static SNode* orderByExprNodeCopy(const SOrderByExprNode* pSrc, SOrderByExprNode* pDst) { + COPY_ALL_SCALAR_FIELDS; + CLONE_NODE_FIELD(pExpr); + return (SNode*)pDst; +} + static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) { COPY_SCALAR_FIELD(mode); CLONE_NODE_FIELD(pValues); @@ -280,6 +291,12 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD return (SNode*)pDst; } +static SNode* logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pSortKeys); + return (SNode*)pDst; +} + static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { CLONE_NODE_FIELD(pNode); COPY_SCALAR_FIELD(subplanType); @@ -339,6 +356,7 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { case QUERY_NODE_GROUPING_SET: return groupingSetNodeCopy((const SGroupingSetNode*)pNode, (SGroupingSetNode*)pDst); case QUERY_NODE_ORDER_BY_EXPR: + return orderByExprNodeCopy((const SOrderByExprNode*)pNode, (SOrderByExprNode*)pDst); case QUERY_NODE_LIMIT: break; case QUERY_NODE_FILL: @@ -361,6 +379,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_WINDOW: return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst); + case QUERY_NODE_LOGIC_PLAN_SORT: + return logicSortCopy((const SSortLogicNode*)pNode, (SSortLogicNode*)pDst); case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst); default: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 37be210610..4225b5c9a8 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -920,6 +920,37 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkSortPhysiPlanExprs = "Exprs"; +static const char* jkSortPhysiPlanSortKeys = "SortKeys"; + +static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { + const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkSortPhysiPlanExprs, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkSortPhysiPlanSortKeys, pNode->pSortKeys); + } + + return code; +} + +static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { + SSortPhysiNode* pNode = (SSortPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSortPhysiPlanExprs, &pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSortPhysiPlanSortKeys, &pNode->pSortKeys); + } + + return code; +} + static const char* jkWindowPhysiPlanExprs = "Exprs"; static const char* jkWindowPhysiPlanFuncs = "Funcs"; @@ -1807,6 +1838,38 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) { return code; } +static const char* jkOrderByExprExpr = "Expr"; +static const char* jkOrderByExprOrder = "Order"; +static const char* jkOrderByExprNullOrder = "NullOrder"; + +static int32_t orderByExprNodeToJson(const void* pObj, SJson* pJson) { + const SOrderByExprNode* pNode = (const SOrderByExprNode*)pObj; + + int32_t code = tjsonAddObject(pJson, jkOrderByExprExpr, nodeToJson, pNode->pExpr); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkOrderByExprOrder, pNode->order); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkOrderByExprNullOrder, pNode->nullOrder); + } + + return code; +} + +static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) { + SOrderByExprNode* pNode = (SOrderByExprNode*)pObj; + + int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder); + } + + return code; +} + static const char* jkIntervalWindowInterval = "Interval"; static const char* jkIntervalWindowOffset = "Offset"; static const char* jkIntervalWindowSliding = "Sliding"; @@ -2155,6 +2218,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_GROUPING_SET: return groupingSetNodeToJson(pObj, pJson); case QUERY_NODE_ORDER_BY_EXPR: + return orderByExprNodeToJson(pObj, pJson); case QUERY_NODE_LIMIT: case QUERY_NODE_STATE_WINDOW: case QUERY_NODE_SESSION_WINDOW: @@ -2218,7 +2282,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return physiExchangeNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SORT: - break; + return physiSortNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: return physiIntervalNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: @@ -2258,7 +2322,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { // break; // case QUERY_NODE_GROUPING_SET: // return jsonToGroupingSetNode(pJson, pObj); - // case QUERY_NODE_ORDER_BY_EXPR: + case QUERY_NODE_ORDER_BY_EXPR: + return jsonToOrderByExprNode(pJson, pObj); // case QUERY_NODE_LIMIT: // case QUERY_NODE_STATE_WINDOW: // case QUERY_NODE_SESSION_WINDOW: @@ -2307,6 +2372,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiAggNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return jsonToPhysiExchangeNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_SORT: + return jsonToPhysiSortNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: return jsonToPhysiIntervalNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index ff71c3bd58..3af5a6d8cc 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -294,10 +294,10 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa case SQL_CLAUSE_GROUP_BY: nodesWalkNode(pSelect->pHaving, walker, pContext); case SQL_CLAUSE_HAVING: - nodesWalkList(pSelect->pProjectionList, walker, pContext); - case SQL_CLAUSE_SELECT: nodesWalkList(pSelect->pOrderByList, walker, pContext); case SQL_CLAUSE_ORDER_BY: + nodesWalkList(pSelect->pProjectionList, walker, pContext); + case SQL_CLAUSE_SELECT: default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d529ee2b1a..b22aa4fea4 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -159,6 +159,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SExchangeLogicNode)); case QUERY_NODE_LOGIC_PLAN_WINDOW: return makeNode(type, sizeof(SWindowLogicNode)); + case QUERY_NODE_LOGIC_PLAN_SORT: + return makeNode(type, sizeof(SSortLogicNode)); case QUERY_NODE_LOGIC_SUBPLAN: return makeNode(type, sizeof(SLogicSubplan)); case QUERY_NODE_LOGIC_PLAN: @@ -182,7 +184,7 @@ SNodeptr nodesMakeNode(ENodeType type) { case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: return makeNode(type, sizeof(SExchangePhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_SORT: - return makeNode(type, sizeof(SNode)); + return makeNode(type, sizeof(SSortPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: return makeNode(type, sizeof(SIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: @@ -555,7 +557,7 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; int32_t colId = pCol->colId; - if (0 == strcmp(pCxt->pTableAlias, pCol->tableAlias)) { + if (NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias)) { return doCollect(pCxt, colId, pNode); } } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 42449d63d6..144254b042 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -22,32 +22,6 @@ extern "C" { #include "planner.h" -#define CHECK_ALLOC(p, res) \ - do { \ - if (NULL == (p)) { \ - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \ - return (res); \ - } \ - } while (0) - -#define CHECK_CODE(exec, res) \ - do { \ - int32_t code = (exec); \ - if (TSDB_CODE_SUCCESS != code) { \ - pCxt->errCode = code; \ - return (res); \ - } \ - } while (0) - -#define CHECK_CODE_EXT(exec) \ - do { \ - int32_t code = (exec); \ - if (TSDB_CODE_SUCCESS != code) { \ - pCxt->errCode = code; \ - return code; \ - } \ - } while (0) - #define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__) #define planError(param, ...) qError("PLAN: " param, __VA_ARGS__) #define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 6111597ebc..7e3b67f809 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -45,7 +45,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { } if (nodesEqualNode(pExpr, *pNode)) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - CHECK_ALLOC(pCol, DEAL_RES_ERROR); + if (NULL == pCol) { + return DEAL_RES_ERROR; + } SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode); pCol->node.resType = pToBeRewrittenExpr->resType; strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName); @@ -311,20 +313,22 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { switch (nodeType(pNode)) { case QUERY_NODE_COLUMN: { SNode* pCol = nodesCloneNode(pNode); - CHECK_ALLOC(pCol, DEAL_RES_ERROR); - CHECK_CODE(nodesListAppend(pCxt->pList, pCol), DEAL_RES_ERROR); - return DEAL_RES_IGNORE_CHILD; + if (NULL == pCol) { + return DEAL_RES_ERROR; + } + return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); } case QUERY_NODE_OPERATOR: case QUERY_NODE_LOGIC_CONDITION: case QUERY_NODE_FUNCTION: { SExprNode* pExpr = (SExprNode*)pNode; SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - CHECK_ALLOC(pCol, DEAL_RES_ERROR); + if (NULL == pCol) { + return DEAL_RES_ERROR; + } pCol->node.resType = pExpr->resType; strcpy(pCol->colName, pExpr->aliasName); - CHECK_CODE(nodesListAppend(pCxt->pList, (SNode*)pCol), DEAL_RES_ERROR); - return DEAL_RES_IGNORE_CHILD; + return (TSDB_CODE_SUCCESS == nodesListAppend(pCxt->pList, pCol) ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); } default: break; @@ -485,6 +489,41 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele return TSDB_CODE_FAILED; } +static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { + if (NULL == pSelect->pOrderByList) { + return TSDB_CODE_SUCCESS; + } + + SSortLogicNode* pSort = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT); + if (NULL == pSort) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pCols = NULL; + int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, &pCols); + if (TSDB_CODE_SUCCESS == code && NULL != pCols) { + pSort->node.pTargets = nodesCloneList(pCols); + if (NULL == pSort->node.pTargets) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + pSort->pSortKeys = nodesCloneList(pSelect->pOrderByList); + if (NULL == pSort->pSortKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicNode = (SLogicNode*)pSort; + } else { + nodesDestroyNode(pSort); + } + + return code; +} + static int32_t createColumnByProjections(SLogicPlanContext* pCxt, const char* pStmtName, SNodeList* pExprs, SNodeList** pCols) { SNodeList* pList = nodesMakeList(); if (NULL == pList) { @@ -539,6 +578,9 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele if (TSDB_CODE_SUCCESS == code) { code = createChildLogicNode(pCxt, pSelect, createAggLogicNode, &pRoot); } + if (TSDB_CODE_SUCCESS == code) { + code = createChildLogicNode(pCxt, pSelect, createSortLogicNode, &pRoot); + } if (TSDB_CODE_SUCCESS == code) { code = createChildLogicNode(pCxt, pSelect, createProjectLogicNode, &pRoot); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 26f0a0cdaa..dbaf64bdbf 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -31,6 +31,10 @@ typedef struct SPhysiPlanContext { } SPhysiPlanContext; static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) { + if (QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode)) { + return getSlotKey(((SOrderByExprNode*)pNode)->pExpr, pStmtName, pKey); + } + if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; if (NULL != pStmtName) { @@ -41,6 +45,7 @@ static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) { } return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName); } + if (NULL != pStmtName) { return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName); } @@ -815,6 +820,41 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr return TSDB_CODE_FAILED; } +static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode, SPhysiNode** pPhyNode) { + SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT); + if (NULL == pSort) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pPrecalcExprs = NULL; + SNodeList* pSortKeys = NULL; + int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pSort->pSortKeys, pSort->node.pOutputDataBlockDesc); + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pSort; + } else { + nodesDestroyNode(pSort); + } + + return code; +} + static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SNodeList* pChildren, SPhysiNode** pPhyNode) { switch (nodeType(pLogicNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: @@ -829,6 +869,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_SORT: + return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode); default: break; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d203d41cb0..a38fea04a5 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -65,7 +65,9 @@ static int32_t stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode); if (NULL != pSplitNode) { SStsInfo* pInfo = calloc(1, sizeof(SStsInfo)); - CHECK_ALLOC(pInfo, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } pInfo->pScan = (SScanLogicNode*)pSplitNode; pInfo->pSubplan = pSubplan; pCxt->pInfo = pInfo; diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index c318652b76..ce9aca7f46 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -201,6 +201,19 @@ TEST_F(PlannerTest, sessionWindow) { ASSERT_TRUE(run()); } +TEST_F(PlannerTest, orderBy) { + setDatabase("root", "test"); + + bind("SELECT * FROM t1 order by c1"); + ASSERT_TRUE(run()); + + bind("SELECT c1 FROM t1 order by c2"); + ASSERT_TRUE(run()); + + bind("SELECT * FROM t1 order by c1 + 10, c2"); + ASSERT_TRUE(run()); +} + TEST_F(PlannerTest, showTables) { setDatabase("root", "test"); From 1d52dd9d430586df706e78484bdbd741d71877ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Mar 2022 15:02:29 +0800 Subject: [PATCH 4/5] [td-13039] fix bug. --- source/libs/executor/inc/executorimpl.h | 5 ++- source/libs/executor/src/executorimpl.c | 59 ++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c582873315..b6b5cdb727 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -415,6 +415,7 @@ typedef struct STableScanInfo { int32_t* rowCellInfoOffset; SExprInfo* pExpr; SSDataBlock block; + SArray* pColMatchInfo; int32_t numOfOutput; int64_t elapsedTime; int32_t prevGroupId; // previous table group id @@ -646,8 +647,8 @@ typedef struct SDistinctOperatorInfo { } SDistinctOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, - int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, + int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a601b9b0b7..872ce173ad 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -66,6 +66,11 @@ typedef enum SResultTsInterpType { RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; +typedef struct SColMatchInfo { + int32_t colId; + int32_t targetSlotId; +} SColMatchInfo; + #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { uint32_t v = taosRand(); @@ -2944,12 +2949,21 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, *status = BLK_DATA_ALL_NEEDED; - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); - if (pBlock->pDataBlock == NULL) { + SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); + if (pCols == NULL) { return terrno; - } else { - return TSDB_CODE_SUCCESS; } + + int32_t numOfCols = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + ASSERT(pColMatchInfo->colId == p->info.colId); + + taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); + } + + return TSDB_CODE_SUCCESS; } int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { @@ -5374,7 +5388,8 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { return pResBlock; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, + SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); @@ -5387,12 +5402,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } + pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData idata = {0}; + taosArrayPush(pInfo->block.pDataBlock, &idata); + } + pInfo->pTsdbReadHandle = pTsdbReadHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; pInfo->order = order; pInfo->current = 0; pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; @@ -8497,6 +8519,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); +static SArray* extractColMatchInfo(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { @@ -8505,7 +8528,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols); + + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); @@ -8683,6 +8708,28 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } +SArray* extractColMatchInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); + SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; + + SColMatchInfo c = {0}; + c.colId = pColNode->colId; + c.targetSlotId = pNode->slotId; + + taosArrayPush(pList, &c); + } + + return pList; +} + int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { From 85f87ae246900401712c0c83e3d15c266b55b305 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 26 Mar 2022 05:45:48 -0400 Subject: [PATCH 5/5] bugfix --- .gitignore | 1 + include/libs/nodes/plannodes.h | 2 + include/libs/nodes/querynodes.h | 3 +- source/libs/nodes/src/nodesCloneFuncs.c | 16 ++++---- source/libs/nodes/src/nodesCodeFuncs.c | 7 ++++ source/libs/nodes/src/nodesTraverseFuncs.c | 6 +++ source/libs/parser/inc/parUtil.h | 2 + source/libs/parser/src/parAstCreater.c | 7 ++++ source/libs/parser/src/parTranslater.c | 4 ++ source/libs/planner/src/planLogicCreater.c | 8 +++- source/libs/planner/src/planPhysiCreater.c | 46 +++++++++++++++++++--- 11 files changed, 88 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 1bfbf00cd5..b62bd62d9c 100644 --- a/.gitignore +++ b/.gitignore @@ -89,6 +89,7 @@ tests/examples/JDBC/JDBCDemo/.project tests/examples/JDBC/JDBCDemo/.settings/ source/libs/parser/inc/sql.* tests/script/tmqResult.txt +tests/tmqResult.txt # Emacs # -*- mode: gitignore; -*- diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 7b0a563907..c87dd0ec8b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -97,6 +97,7 @@ typedef struct SWindowLogicNode { int8_t slidingUnit; SFillNode* pFill; int64_t sessionGap; + SNode* pTspk; } SWindowLogicNode; typedef struct SSortLogicNode { @@ -228,6 +229,7 @@ typedef struct SWinodwPhysiNode { typedef struct SIntervalPhysiNode { SWinodwPhysiNode window; + SNode* pTspk; // timestamp primary key int64_t interval; int64_t offset; int64_t sliding; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5609bb4269..1ba7ae10d5 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -191,12 +191,13 @@ typedef struct SStateWindowNode { typedef struct SSessionWindowNode { ENodeType type; // QUERY_NODE_SESSION_WINDOW - SNode* pCol; + SNode* pCol; // timestamp primary key SNode* pGap; // gap between two session window(in microseconds) } SSessionWindowNode; typedef struct SIntervalWindowNode { ENodeType type; // QUERY_NODE_INTERVAL_WINDOW + SNode* pCol; // timestamp primary key SNode* pInterval; // SValueNode SNode* pOffset; // SValueNode SNode* pSliding; // SValueNode diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 161427b4b5..17c66f6ed7 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -278,16 +278,18 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo } static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) { + COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); - COPY_SCALAR_FIELD(winType); + // COPY_SCALAR_FIELD(winType); CLONE_NODE_LIST_FIELD(pFuncs); - COPY_SCALAR_FIELD(interval); - COPY_SCALAR_FIELD(offset); - COPY_SCALAR_FIELD(sliding); - COPY_SCALAR_FIELD(intervalUnit); - COPY_SCALAR_FIELD(slidingUnit); + // COPY_SCALAR_FIELD(interval); + // COPY_SCALAR_FIELD(offset); + // COPY_SCALAR_FIELD(sliding); + // COPY_SCALAR_FIELD(intervalUnit); + // COPY_SCALAR_FIELD(slidingUnit); CLONE_NODE_FIELD(pFill); - COPY_SCALAR_FIELD(sessionGap); + // COPY_SCALAR_FIELD(sessionGap); + CLONE_NODE_FIELD(pTspk); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 4225b5c9a8..849764c995 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -988,6 +988,7 @@ static const char* jkIntervalPhysiPlanSliding = "Sliding"; static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit"; static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit"; static const char* jkIntervalPhysiPlanFill = "Fill"; +static const char* jkIntervalPhysiPlanTsPk = "TsPk"; static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; @@ -1011,6 +1012,9 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkIntervalPhysiPlanTsPk, nodeToJson, pNode->pTspk); + } return code; } @@ -1037,6 +1041,9 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkIntervalPhysiPlanTsPk, (SNode**)&pNode->pTspk); + } return code; } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 3af5a6d8cc..7eaa049946 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -99,6 +99,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker if (DEAL_RES_ERROR != res) { res = walkNode(pInterval->pFill, order, walker, pContext); } + if (DEAL_RES_ERROR != res) { + res = walkNode(pInterval->pCol, order, walker, pContext); + } break; } case QUERY_NODE_NODE_LIST: @@ -225,6 +228,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit if (DEAL_RES_ERROR != res) { res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext); } + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext); + } break; } case QUERY_NODE_NODE_LIST: diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 171b406e18..742ab303d3 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -30,6 +30,8 @@ extern "C" { #define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__) #define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__) +#define PK_TS_COL_INTERNAL_NAME "_rowts" + typedef struct SMsgBuf { int32_t len; char *buf; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 5b2f36878b..d9ee1309ad 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -702,6 +702,13 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol) { SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill) { SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW); CHECK_OUT_OF_MEM(interval); + interval->pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == interval->pCol) { + nodesDestroyNode(interval); + CHECK_OUT_OF_MEM(interval->pCol); + } + ((SColumnNode*)interval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(((SColumnNode*)interval->pCol)->colName, PK_TS_COL_INTERNAL_NAME); interval->pInterval = pInterval; interval->pOffset = pOffset; interval->pSliding = pSliding; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 19c6cb3f27..3e88ce0831 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -256,6 +256,10 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { bool found = false; if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME)) { + setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, false, pCol); + return true; + } int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; for (int32_t i = 0; i < nums; ++i) { if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 7e3b67f809..0eb4ec23fa 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -88,7 +88,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { } static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { - static int32_t rewriteId = 1; + static int32_t rewriteId = 1; // todo modify SNameExprCxt nameCxt = { .rewriteId = rewriteId }; nodesWalkList(pExprs, doNameExpr, &nameCxt); SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs }; @@ -461,6 +461,12 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval); pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); + pWindow->pTspk = nodesCloneNode(pInterval->pCol); + if (NULL == pWindow->pTspk) { + nodesDestroyNode(pWindow); + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL != pInterval->pFill) { pWindow->pFill = nodesCloneNode(pInterval->pFill); if (NULL == pWindow->pFill) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index dbaf64bdbf..fe8bafbed3 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -17,9 +17,14 @@ #include "functionMgt.h" +typedef struct SSlotIdInfo { + int16_t slotId; + bool set; +} SSlotIdInfo; + typedef struct SSlotIndex { int16_t dataBlockId; - int16_t slotId; + SArray* pSlotIdsInfo; // duplicate name slot } SSlotIndex; typedef struct SPhysiPlanContext { @@ -79,7 +84,19 @@ static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, S } static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) { - SSlotIndex index = { .dataBlockId = dataBlockId, .slotId = slotId }; + SSlotIndex* pIndex = taosHashGet(pHash, pName, len); + if (NULL != pIndex) { + SSlotIdInfo info = { .slotId = slotId, .set = false }; + taosArrayPush(pIndex->pSlotIdsInfo, &info); + return TSDB_CODE_SUCCESS; + } + + SSlotIndex index = { .dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo)) }; + if (NULL == index.pSlotIdsInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SSlotIdInfo info = { .slotId = slotId, .set = false }; + taosArrayPush(index.pSlotIdsInfo, &info); return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex)); } @@ -90,7 +107,7 @@ static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, } static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) { - SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == pHash) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -149,6 +166,18 @@ static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SD return code; } +static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) { + int32_t size = taosArrayGetSize(pSlotIdsInfo); + for (int32_t i = 0; i < size; ++i) { + SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i); + if (!pInfo->set) { + pInfo->set = true; + return pInfo->slotId; + } + } + return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId; +} + static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) { int32_t code = TSDB_CODE_SUCCESS; SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); @@ -167,7 +196,7 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, slotId = nextSlotId; ++nextSlotId; } else { - slotId = pIndex->slotId; + slotId = getUnsetSlotId(pIndex->pSlotIdsInfo); } if (TSDB_CODE_SUCCESS == code) { @@ -217,7 +246,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { return DEAL_RES_ERROR; } ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; - ((SColumnNode*)pNode)->slotId = pIndex->slotId; + ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId; return DEAL_RES_IGNORE_CHILD; } return DEAL_RES_CONTINUE; @@ -792,6 +821,13 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil return TSDB_CODE_OUT_OF_MEMORY; } + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pInterval->pTspk); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pInterval); + return code; + } + return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode); }