diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 7a4b9014bc..78aabb5c26 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo { int64_t prevTs; bool prevTsSet; uint64_t groupId; - SSDataBlock* pPrevGroupRes; + SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; @@ -227,16 +227,26 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp continue; } else if (!isInterpFunc(pExprInfo)) { if (isGroupKeyFunc(pExprInfo)) { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + if (pSrcBlock != NULL) { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); - if (colDataIsNull_s(pSrc, index)) { - colDataSetNULL(pDst, pResBlock->info.rows); - continue; + if (colDataIsNull_s(pSrc, index)) { + colDataSetNULL(pDst, pResBlock->info.rows); + continue; + } + + char* v = colDataGetData(pSrc, index); + colDataSetVal(pDst, pResBlock->info.rows, v, false); + } else { + // use stored group key + SGroupKeys* pkey = pSliceInfo->pPrevGroupKey; + if (pkey->isNull == false) { + colDataSetVal(pDst, rows, pkey->pData, false); + } else { + colDataSetNULL(pDst, rows); + } } - - char* v = colDataGetData(pSrc, index); - colDataSetVal(pDst, pResBlock->info.rows, v, false); } continue; } @@ -464,7 +474,31 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB return TSDB_CODE_SUCCESS; } -static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { +static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) { + if (pInfo->pPrevGroupKey != NULL) { + return TSDB_CODE_SUCCESS; + } + + pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys)); + if (pInfo->pPrevGroupKey == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[i]; + + if (isGroupKeyFunc(pExprInfo)) { + pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes; + pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; + pInfo->pPrevGroupKey->isNull = false; + pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) { int32_t code; code = initPrevRowsKeeper(pInfo, pBlock); if (code != TSDB_CODE_SUCCESS) { @@ -481,6 +515,12 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock return TSDB_CODE_FAILED; } + code = initGroupKeyKeeper(pInfo, pExprSup); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; } @@ -627,22 +667,42 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } -static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, SSDataBlock* pSrcBlock, - int32_t index) { +static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) { SSDataBlock* pResBlock = pSliceInfo->pRes; SInterval* pInterval = &pSliceInfo->interval; while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pSrcBlock, index, false); + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } } -static void copyPrevGroupDataBlock(SSDataBlock* pDstBlock, SSDataBlock* pSrcBlock) { - blockDataCleanup(pDstBlock); - assignOneDataBlock(pDstBlock, pSrcBlock); +static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) { + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + + if (isGroupKeyFunc(pExprInfo)) { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + + if (colDataIsNull_s(pSrc, 0)) { + pGroupKey->isNull = true; + break; + } + + char* v = colDataGetData(pSrc, 0); + if (IS_VAR_DATA_TYPE(pGroupKey->type)) { + memcpy(pGroupKey->pData, v, varDataTLen(v)); + } else { + memcpy(pGroupKey->pData, v, pGroupKey->bytes); + } + + pGroupKey->isNull = false; + break; + } + } } static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { @@ -672,7 +732,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->pNextGroupRes != NULL) { setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); - copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pSliceInfo->pNextGroupRes); + copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes); pSliceInfo->pNextGroupRes = NULL; } @@ -698,7 +758,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } - int32_t code = initKeeperInfo(pSliceInfo, pBlock); + int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -706,13 +766,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); - copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pBlock); + copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock); } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pPrevGroupRes, - pSliceInfo->pPrevGroupRes->info.rows - 1); + genInterpAfterDataBlock(pSliceInfo, pOperator, 0); doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pOperator->status == OP_EXEC_DONE) { @@ -776,7 +835,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->prevTsSet = false; pInfo->prevTs = 0; pInfo->groupId = 0; - pInfo->pPrevGroupRes = createDataBlock(); + pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { @@ -806,7 +865,6 @@ void destroyTimeSliceOperatorInfo(void* param) { STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); - pInfo->pPrevGroupRes = blockDataDestroy(pInfo->pPrevGroupRes); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); @@ -826,6 +884,10 @@ void destroyTimeSliceOperatorInfo(void* param) { taosMemoryFree(pKey->end.val); } taosArrayDestroy(pInfo->pLinearInfo); + + taosMemoryFree(pInfo->pPrevGroupKey->pData); + taosMemoryFree(pInfo->pPrevGroupKey); + cleanupExprSupp(&pInfo->scalarSup); for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {