From 42df9f587f1e2f8eae33230e0f648b4e48716f49 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 28 Apr 2023 16:41:36 +0800 Subject: [PATCH] fix group_key not set properly --- source/libs/executor/src/timesliceoperator.c | 41 ++++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7b0da0608a..310c3c5674 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -39,6 +39,7 @@ typedef struct STimeSliceOperatorInfo { int64_t prevTs; bool prevTsSet; uint64_t groupId; + SSDataBlock* pCurrentGroupRes; SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; @@ -197,18 +198,24 @@ static bool isInterpFunc(SExprInfo* pExprInfo) { return (strcasecmp(name, "interp") == 0); } +static bool isGroupKeyFunc(SExprInfo* pExprInfo) { + char *name = pExprInfo->pExpr->_function.functionName; + return (strcasecmp(name, "_group_key") == 0); +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, - bool beforeTs) { + SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) { int32_t rows = pResBlock->info.rows; timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock); // todo set the correct primary timestamp column + // output the result bool hasInterp = true; for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); if (isIrowtsPseudoColumn(pExprInfo)) { @@ -219,6 +226,18 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false); continue; } else if (!isInterpFunc(pExprInfo)) { + if (isGroupKeyFunc(pExprInfo)) { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + + if (colDataIsNull_s(pSrc, index)) { + colDataSetNULL(pDst, pResBlock->info.rows); + continue; + } + + char* v = colDataGetData(pSrc, index); + colDataSetVal(pDst, pResBlock->info.rows, v, false); + } continue; } @@ -345,7 +364,7 @@ static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* bool isFilled = false; colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false); } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); if (colDataIsNull_s(pSrc, index)) { @@ -559,7 +578,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { break; } else { @@ -583,7 +602,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS doKeepLinearInfo(pSliceInfo, pBlock, i); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { break; } else { @@ -608,13 +627,14 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } -static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator) { +static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, SSDataBlock* pSrcBlock, + int32_t index) { SSDataBlock* pResBlock = pSliceInfo->pRes; SInterval* pInterval = &pSliceInfo->interval; while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pSrcBlock, index, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } @@ -645,7 +665,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { while (1) { if (pSliceInfo->pNextGroupRes != NULL) { + setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); + pSliceInfo->pCurrentGroupRes = pSliceInfo->pNextGroupRes; pSliceInfo->pNextGroupRes = NULL; } @@ -679,11 +701,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); + pSliceInfo->pCurrentGroupRes = pBlock; } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - genInterpAfterDataBlock(pSliceInfo, pOperator); + genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pCurrentGroupRes, + pSliceInfo->pCurrentGroupRes->info.rows - 1); doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pOperator->status == OP_EXEC_DONE) { @@ -748,6 +772,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->prevTs = 0; pInfo->groupId = 0; pInfo->pNextGroupRes = NULL; + pInfo->pCurrentGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;