From df674440bf8b40170939cde659c64af9c4bcd3ef Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 18 Feb 2025 19:27:47 +0800 Subject: [PATCH] fix: last row scan and tag in conds issues --- source/libs/executor/src/cachescanoperator.c | 16 +++---- source/libs/planner/src/planOptimizer.c | 49 ++++++++++++++++++++ source/libs/scheduler/src/schStatus.c | 8 ++-- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 649a7a4524..7349feb70f 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -398,16 +398,16 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { // check for tag values if (pInfo->pRes->info.rows > 0) { - if (pInfo->pseudoExprSup.numOfExprs > 0) { - SExprSupp* pSup = &pInfo->pseudoExprSup; + SExprSupp* pSup = &pInfo->pseudoExprSup; - STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0]; - pInfo->pRes->info.id.groupId = pKeyInfo->groupId; + STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0]; + pInfo->pRes->info.id.groupId = pKeyInfo->groupId; - if (taosArrayGetSize(pInfo->pUidList) > 0) { - void* pUid = taosArrayGet(pInfo->pUidList, 0); - QUERY_CHECK_NULL(pUid, code, lino, _end, terrno); - pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid; + if (taosArrayGetSize(pInfo->pUidList) > 0) { + void* pUid = taosArrayGet(pInfo->pUidList, 0); + QUERY_CHECK_NULL(pUid, code, lino, _end, terrno); + pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid; + if (pInfo->pseudoExprSup.numOfExprs > 0) { code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, pTaskInfo, NULL); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index a3fa5a3d7c..e8a8e6889a 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -93,6 +93,12 @@ typedef struct SCpdCollectTableColCxt { int32_t errCode; } SCpdCollectTableColCxt; +typedef struct SCollectColsCxt { + SHashObj* pColHash; + int32_t errCode; +} SCollectColsCxt; + + typedef enum ECondAction { COND_ACTION_STAY = 1, COND_ACTION_PUSH_JOIN, @@ -3302,11 +3308,54 @@ static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t st return code; } +static EDealRes partTagsCollectColsNodes(SNode* pNode, void* pContext) { + SCollectColsCxt* pCxt = pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (NULL == taosHashGet(pCxt->pColHash, pCol->colName, strlen(pCol->colName))) { + pCxt->errCode = taosHashPut(pCxt->pColHash, pCol->colName, strlen(pCol->colName), NULL, 0); + } + } + + return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR); +} + + +static bool partTagsIsScanPseudoColsInConds(SScanLogicNode* pScan) { + SCollectColsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; + if (NULL == cxt.pColHash) { + return true; + } + + nodesWalkExpr(pScan->node.pConditions, partTagsCollectColsNodes, &cxt); + if (cxt.errCode) { + taosHashCleanup(cxt.pColHash); + return true; + } + + SNode* pNode = NULL; + FOREACH(pNode, pScan->pScanPseudoCols) { + if (taosHashGet(cxt.pColHash, ((SExprNode*)pNode)->aliasName, strlen(((SExprNode*)pNode)->aliasName))) { + taosHashCleanup(cxt.pColHash); + return true; + } + } + + taosHashCleanup(cxt.pColHash); + return false; +} + static int32_t partTagsOptRemovePseudoCols(SScanLogicNode* pScan) { if (!pScan->noPseudoRefAfterGrp || NULL == pScan->pScanPseudoCols || pScan->pScanPseudoCols->length <= 0) { return TSDB_CODE_SUCCESS; } + if (pScan->node.pConditions && partTagsIsScanPseudoColsInConds(pScan)) { + return TSDB_CODE_SUCCESS; + } + SNode* pNode = NULL, *pTarget = NULL; FOREACH(pNode, pScan->pScanPseudoCols) { FOREACH(pTarget, pScan->node.pTargets) { diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index f24ee74101..9792af22f6 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -22,9 +22,11 @@ #include "trpc.h" int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { - int32_t code = 0; - SCH_ERR_JRET(schUpdateJobStatus(pJob, status)); - + int32_t code = schUpdateJobStatus(pJob, status); + if (TSDB_CODE_SUCCESS != code) { + SCH_ERR_JRET((param && *(int32_t*)param) ? *(int32_t*)param : code); + } + switch (status) { case JOB_TASK_STATUS_INIT: break;