diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b4489a92fb..6778e97d7a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2215,6 +2215,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp colDataAppend(pDst, rows, (char *)current.val, false); } + taosMemoryFree(current.val); pResBlock->info.rows += 1; break; } @@ -2437,10 +2438,10 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // if its the last point in data block, no need to fill, but reserve this point as the start value and do // the interpolation when processing next data block. if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { - doKeepLinearInfo(pSliceInfo, pBlock, i, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (i < pBlock->info.rows - 1) { + doKeepLinearInfo(pSliceInfo, pBlock, i, false); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { @@ -2478,11 +2479,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { doKeepPrevRows(pSliceInfo, pBlock, i); if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { - doKeepLinearInfo(pSliceInfo, pBlock, i, false); // no need to increate pSliceInfo->current here //pSliceInfo->current = // taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (i < pBlock->info.rows - 1) { + doKeepLinearInfo(pSliceInfo, pBlock, i, false); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { @@ -2558,10 +2559,10 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { - doKeepLinearInfo(pSliceInfo, pBlock, i, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (i < pBlock->info.rows - 1) { + doKeepLinearInfo(pSliceInfo, pBlock, i, false); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { @@ -2618,6 +2619,35 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { return pResBlock->info.rows == 0 ? NULL : pResBlock; } +void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { + STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; + + pInfo->pRes = blockDataDestroy(pInfo->pRes); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { + SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); + taosMemoryFree(pKey->pData); + } + taosArrayDestroy(pInfo->pPrevRow); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) { + SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i); + taosMemoryFree(pKey->pData); + } + taosArrayDestroy(pInfo->pNextRow); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { + SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i); + taosMemoryFree(pKey->start.val); + taosMemoryFree(pKey->end.val); + } + taosArrayDestroy(pInfo->pLinearInfo); + + taosMemoryFree(pInfo->pFillColInfo); + taosMemoryFreeClear(param); +} + + SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -2665,7 +2695,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL, NULL, NULL); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);