diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3e95f1e286..1afec35c3c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -128,6 +128,7 @@ typedef struct SScanLogicNode { bool paraTablesSort; // for table merge scan bool smallDataTsSort; // disable row id sort for table merge scan bool needSplit; + bool noPseudoRefAfterGrp; // no pseudo columns referenced ater group/partition clause } SScanLogicNode; typedef struct SJoinLogicNode { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 649a7a4524..7349feb70f 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -398,16 +398,16 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { // check for tag values if (pInfo->pRes->info.rows > 0) { - if (pInfo->pseudoExprSup.numOfExprs > 0) { - SExprSupp* pSup = &pInfo->pseudoExprSup; + SExprSupp* pSup = &pInfo->pseudoExprSup; - STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0]; - pInfo->pRes->info.id.groupId = pKeyInfo->groupId; + STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0]; + pInfo->pRes->info.id.groupId = pKeyInfo->groupId; - if (taosArrayGetSize(pInfo->pUidList) > 0) { - void* pUid = taosArrayGet(pInfo->pUidList, 0); - QUERY_CHECK_NULL(pUid, code, lino, _end, terrno); - pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid; + if (taosArrayGetSize(pInfo->pUidList) > 0) { + void* pUid = taosArrayGet(pInfo->pUidList, 0); + QUERY_CHECK_NULL(pUid, code, lino, _end, terrno); + pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid; + if (pInfo->pseudoExprSup.numOfExprs > 0) { code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, pTaskInfo, NULL); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index bc741b46ce..0e7a719c2c 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -510,6 +510,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(paraTablesSort); COPY_SCALAR_FIELD(smallDataTsSort); COPY_SCALAR_FIELD(needSplit); + COPY_SCALAR_FIELD(noPseudoRefAfterGrp); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index c3fd9cdcf2..3f064f2b66 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -406,6 +406,47 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; } +static int32_t updateScanNoPseudoRefAfterGrp(SSelectStmt* pSelect, SScanLogicNode* pScan, SRealTableNode* pRealTable) { + if (NULL == pScan->pScanPseudoCols || pScan->pScanPseudoCols->length <= 0 || NULL != pSelect->pTags || NULL != pSelect->pSubtable) { + return TSDB_CODE_SUCCESS; + } + + SNodeList* pList = NULL; + int32_t code = 0; + if (NULL == pSelect->pPartitionByList || pSelect->pPartitionByList->length <= 0) { + if (NULL == pSelect->pGroupByList || pSelect->pGroupByList->length <= 0) { + return TSDB_CODE_SUCCESS; + } + + code = nodesCollectColumns(pSelect, SQL_CLAUSE_GROUP_BY, pRealTable->table.tableAlias, COLLECT_COL_TYPE_TAG, + &pList); + if (TSDB_CODE_SUCCESS == code) { + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, pRealTable->table.tableAlias, fmIsScanPseudoColumnFunc, + &pList); + } + if (TSDB_CODE_SUCCESS == code && (NULL == pList || pList->length <= 0)) { + pScan->noPseudoRefAfterGrp = true; + } + goto _return; + } + + code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, pRealTable->table.tableAlias, COLLECT_COL_TYPE_TAG, + &pList); + if (TSDB_CODE_SUCCESS == code) { + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_PARTITION_BY, pRealTable->table.tableAlias, fmIsScanPseudoColumnFunc, + &pList); + } + + if (TSDB_CODE_SUCCESS == code && (NULL == pList || pList->length <= 0)) { + pScan->noPseudoRefAfterGrp = true; + } + +_return: + + nodesDestroyList(pList); + return code; +} + static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable, SLogicNode** pLogicNode) { SScanLogicNode* pScan = NULL; @@ -437,7 +478,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect &pScan->pScanPseudoCols); } - pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan); + if (TSDB_CODE_SUCCESS == code) { + code = updateScanNoPseudoRefAfterGrp(pSelect, pScan, pRealTable); + } + + if (TSDB_CODE_SUCCESS == code) { + pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan); + } // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 38f2e5024f..e8a8e6889a 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -93,6 +93,12 @@ typedef struct SCpdCollectTableColCxt { int32_t errCode; } SCpdCollectTableColCxt; +typedef struct SCollectColsCxt { + SHashObj* pColHash; + int32_t errCode; +} SCollectColsCxt; + + typedef enum ECondAction { COND_ACTION_STAY = 1, COND_ACTION_PUSH_JOIN, @@ -3302,6 +3308,70 @@ static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t st return code; } +static EDealRes partTagsCollectColsNodes(SNode* pNode, void* pContext) { + SCollectColsCxt* pCxt = pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (NULL == taosHashGet(pCxt->pColHash, pCol->colName, strlen(pCol->colName))) { + pCxt->errCode = taosHashPut(pCxt->pColHash, pCol->colName, strlen(pCol->colName), NULL, 0); + } + } + + return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR); +} + + +static bool partTagsIsScanPseudoColsInConds(SScanLogicNode* pScan) { + SCollectColsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; + if (NULL == cxt.pColHash) { + return true; + } + + nodesWalkExpr(pScan->node.pConditions, partTagsCollectColsNodes, &cxt); + if (cxt.errCode) { + taosHashCleanup(cxt.pColHash); + return true; + } + + SNode* pNode = NULL; + FOREACH(pNode, pScan->pScanPseudoCols) { + if (taosHashGet(cxt.pColHash, ((SExprNode*)pNode)->aliasName, strlen(((SExprNode*)pNode)->aliasName))) { + taosHashCleanup(cxt.pColHash); + return true; + } + } + + taosHashCleanup(cxt.pColHash); + return false; +} + +static int32_t partTagsOptRemovePseudoCols(SScanLogicNode* pScan) { + if (!pScan->noPseudoRefAfterGrp || NULL == pScan->pScanPseudoCols || pScan->pScanPseudoCols->length <= 0) { + return TSDB_CODE_SUCCESS; + } + + if (pScan->node.pConditions && partTagsIsScanPseudoColsInConds(pScan)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pNode = NULL, *pTarget = NULL; + FOREACH(pNode, pScan->pScanPseudoCols) { + FOREACH(pTarget, pScan->node.pTargets) { + if (0 == strcmp(((SExprNode*)pNode)->aliasName, ((SColumnNode*)pTarget)->colName)) { + ERASE_NODE(pScan->node.pTargets); + break; + } + } + } + + nodesDestroyList(pScan->pScanPseudoCols); + pScan->pScanPseudoCols = NULL; + + return TSDB_CODE_SUCCESS; +} + static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized, NULL); if (NULL == pNode) { @@ -3362,6 +3432,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg); } } + if (TSDB_CODE_SUCCESS == code) { + code = partTagsOptRemovePseudoCols(pScan); + } if (TSDB_CODE_SUCCESS == code) { code = partTagsOptRebuildTbanme(pScan->pGroupTags); } diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index f24ee74101..9792af22f6 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -22,9 +22,11 @@ #include "trpc.h" int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { - int32_t code = 0; - SCH_ERR_JRET(schUpdateJobStatus(pJob, status)); - + int32_t code = schUpdateJobStatus(pJob, status); + if (TSDB_CODE_SUCCESS != code) { + SCH_ERR_JRET((param && *(int32_t*)param) ? *(int32_t*)param : code); + } + switch (status) { case JOB_TASK_STATUS_INIT: break;