diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 5dcdfd2791..53fe76d851 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -42,7 +42,7 @@ typedef struct STimeSliceOperatorInfo { SRowKey prevKey; bool prevTsSet; uint64_t groupId; - SGroupKeys* pPrevGroupKey; + SArray* pPrevGroupKeys; SSDataBlock* pNextGroupRes; SSDataBlock* pRemainRes; // save block unfinished processing int32_t remainIndex; // the remaining index in the block to be processed @@ -288,6 +288,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp // output the result int32_t fillColIndex = 0; + int32_t groupKeyIndex = 0; bool hasInterp = true; for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; @@ -320,7 +321,9 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp QUERY_CHECK_CODE(code, lino, _end); } else if (!isSelectGroupConstValueFunc(pExprInfo)) { // use stored group key - SGroupKeys* pkey = pSliceInfo->pPrevGroupKey; + SGroupKeys *pkey = taosArrayGet(pSliceInfo->pPrevGroupKeys, groupKeyIndex); + QUERY_CHECK_NULL(pkey, code, lino, _end, terrno); + groupKeyIndex++; if (pkey->isNull == false) { code = colDataSetVal(pDst, rows, pkey->pData, false); QUERY_CHECK_CODE(code, lino, _end); @@ -645,13 +648,20 @@ _end: return code; } +static void destroyGroupKey(void* pKey) { + SGroupKeys* key = (SGroupKeys*)pKey; + if (key->pData != NULL) { + taosMemoryFreeClear(key->pData); + } +} + static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) { - if (pInfo->pPrevGroupKey != NULL) { + if (pInfo->pPrevGroupKeys != NULL) { return TSDB_CODE_SUCCESS; } - pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys)); - if (pInfo->pPrevGroupKey == NULL) { + pInfo->pPrevGroupKeys = taosArrayInit(pExprSup->numOfExprs, sizeof(SGroupKeys)); + if (pInfo->pPrevGroupKeys == NULL) { return terrno; } @@ -659,11 +669,17 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp SExprInfo* pExprInfo = &pExprSup->pExprInfo[i]; if (isGroupKeyFunc(pExprInfo)) { - pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes; - pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; - pInfo->pPrevGroupKey->isNull = false; - pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); - if (!pInfo->pPrevGroupKey->pData) { + SGroupKeys key = {.bytes = pExprInfo->base.resSchema.bytes, + .type = pExprInfo->base.resSchema.type, + .isNull = false, + .pData = taosMemoryCalloc(1, pExprInfo->base.resSchema.bytes)}; + if (!key.pData) { + taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey); + return terrno; + } + if (NULL == taosArrayPush(pInfo->pPrevGroupKeys, &key)) { + taosMemoryFree(key.pData); + taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey); return terrno; } } @@ -910,7 +926,7 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato SInterval* pInterval = &pSliceInfo->interval; if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR || - pSliceInfo->pPrevGroupKey == NULL) { + pSliceInfo->pPrevGroupKeys == NULL) { return; } @@ -921,12 +937,18 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato } } -static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) { +static int32_t copyPrevGroupKey(SExprSupp* pExprSup, SArray * pGroupKeys, SSDataBlock* pSrcBlock) { + int32_t groupKeyIdx = 0; for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; if (isGroupKeyFunc(pExprInfo)) { int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SGroupKeys *pGroupKey = taosArrayGet(pGroupKeys, groupKeyIdx); + if (pGroupKey == NULL) { + return terrno; + } + groupKeyIdx++; SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); if (colDataIsNull_s(pSrc, 0)) { @@ -942,9 +964,9 @@ static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataB } pGroupKey->isNull = false; - break; } } + return TSDB_CODE_SUCCESS; } static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { @@ -986,7 +1008,11 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) { T_LONG_JMP(pTaskInfo->env, code); } doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull); - copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock); + code = copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKeys, pBlock); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pTaskInfo->env, code); + } } static int32_t doTimesliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -1160,7 +1186,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN pInfo->prevTsSet = false; pInfo->prevKey.ts = INT64_MIN; pInfo->groupId = 0; - pInfo->pPrevGroupKey = NULL; + pInfo->pPrevGroupKeys = NULL; pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; pInfo->remainIndex = 0; @@ -1233,9 +1259,9 @@ void destroyTimeSliceOperatorInfo(void* param) { } taosArrayDestroy(pInfo->pLinearInfo); - if (pInfo->pPrevGroupKey) { - taosMemoryFree(pInfo->pPrevGroupKey->pData); - taosMemoryFree(pInfo->pPrevGroupKey); + if (pInfo->pPrevGroupKeys) { + taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey); + pInfo->pPrevGroupKeys = NULL; } if (pInfo->hasPk && IS_VAR_DATA_TYPE(pInfo->pkCol.type)) { taosMemoryFreeClear(pInfo->prevKey.pks[0].pData);