From 43cefb9d55bd615b0fbcecf40f79dafc1b60bd65 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 27 Mar 2024 16:26:33 +0800 Subject: [PATCH] feat: find pk col from interp func node for interp operator --- source/libs/executor/src/timesliceoperator.c | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index abaebb1543..4b86f85d53 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -46,6 +46,8 @@ typedef struct STimeSliceOperatorInfo { SSDataBlock* pNextGroupRes; SSDataBlock* pRemainRes; // save block unfinished processing int32_t remainIndex; // the remaining index in the block to be processed + bool hasPk; + SColumn pkCol; } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -953,6 +955,25 @@ _finished: return pResBlock->info.rows == 0 ? NULL : pResBlock; } +static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) { + SNode* pNode; + FOREACH(pNode, pFuncs) { + if ((nodeType(pNode) == QUERY_NODE_TARGET) && + (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) { + SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr; + if (fmIsInterpFunc(pFunc->funcId) && pFunc->hasPk) { + SNode* pNode2 = (pFunc->pParameterList->pTail->pNode); + if ((nodeType(pNode2) == QUERY_NODE_COLUMN) && ((SColumnNode*)pNode2)->isPk) { + *pHasPk = true; + *pPkColumn = extractColumnFromColumnNode((SColumnNode*)pNode2); + break; + } + } + } + } + return TSDB_CODE_SUCCESS; +} + SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -985,6 +1006,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode } pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); + extractPkColumnFromFuncs(pInterpPhyNode->pFuncs, &pInfo->hasPk, &pInfo->pkCol); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); initResultSizeInfo(&pOperator->resultInfo, 4096);