feat: find pk col from interp func node for interp operator

This commit is contained in:
slzhou 2024-03-27 16:26:33 +08:00
parent c4ecbbc181
commit 43cefb9d55
1 changed files with 22 additions and 0 deletions

View File

@ -46,6 +46,8 @@ typedef struct STimeSliceOperatorInfo {
SSDataBlock* pNextGroupRes; SSDataBlock* pNextGroupRes;
SSDataBlock* pRemainRes; // save block unfinished processing SSDataBlock* pRemainRes; // save block unfinished processing
int32_t remainIndex; // the remaining index in the block to be processed int32_t remainIndex; // the remaining index in the block to be processed
bool hasPk;
SColumn pkCol;
} STimeSliceOperatorInfo; } STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param); static void destroyTimeSliceOperatorInfo(void* param);
@ -953,6 +955,25 @@ _finished:
return pResBlock->info.rows == 0 ? NULL : pResBlock; return pResBlock->info.rows == 0 ? NULL : pResBlock;
} }
static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) {
SNode* pNode;
FOREACH(pNode, pFuncs) {
if ((nodeType(pNode) == QUERY_NODE_TARGET) &&
(nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
if (fmIsInterpFunc(pFunc->funcId) && pFunc->hasPk) {
SNode* pNode2 = (pFunc->pParameterList->pTail->pNode);
if ((nodeType(pNode2) == QUERY_NODE_COLUMN) && ((SColumnNode*)pNode2)->isPk) {
*pHasPk = true;
*pPkColumn = extractColumnFromColumnNode((SColumnNode*)pNode2);
break;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -985,6 +1006,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
} }
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
extractPkColumnFromFuncs(pInterpPhyNode->pFuncs, &pInfo->hasPk, &pInfo->pkCol);
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);