diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5fe64fef64..0adabed626 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -579,6 +579,7 @@ typedef struct SStreamIntervalOperatorInfo { SSDataBlock* pPullDataRes; bool isFinal; SArray* pChildren; + int32_t numOfChild; SStreamState* pState; SWinKey delKey; uint64_t numOfDatapack; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c2d2e92c79..89bce58cce 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1541,14 +1541,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { streamFileStateDestroy(pInfo->pState->pFileState); taosMemoryFreeClear(pInfo->pState); - if (pInfo->pChildren) { - int32_t size = taosArrayGetSize(pInfo->pChildren); - for (int32_t i = 0; i < size; i++) { - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyOperatorInfo(pChildOp); - } - taosArrayDestroy(pInfo->pChildren); - } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows); @@ -2081,58 +2073,6 @@ int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos return TSDB_CODE_SUCCESS; } -static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t size = taosArrayGetSize(pWinArray); - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - SExprSupp* pSup = &pOperator->exprSupp; - if (!pInfo->pChildren) { - return; - } - for (int32_t i = 0; i < size; i++) { - SWinKey* pWinRes = taosArrayGet(pWinArray, i); - SRowBuffPos* pCurResPos = NULL; - SResultRow* pCurResult = NULL; - STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); - if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) { - continue; - } - - int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); - int32_t num = 0; - for (int32_t j = 0; j < numOfChildren; j++) { - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); - SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - if (!hasIntervalWindow(pChInfo->pState, pWinRes)) { - continue; - } - if (num == 0) { - int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); - ASSERT(pCurResPos != NULL); - pCurResult = (SResultRow*)pCurResPos->pRowBuff; - if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); - } - } - num++; - SRowBuffPos* pChResPos = NULL; - SResultRow* pChResult = NULL; - setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, - pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); - pChResult = (SResultRow*)pChResPos->pRowBuff; - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); - compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); - } - if (num > 0 && pUpdatedMap) { - saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pCurResPos, pUpdatedMap); - saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize); - } - } -} - bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; @@ -2250,9 +2190,8 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { - int32_t size1 = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, winKey, size1); - qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); + addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild); } } } @@ -2413,7 +2352,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p continue; } - if (IS_FINAL_OP(pInfo) && pInfo->pChildren) { + if (IS_FINAL_OP(pInfo) && pInfo->numOfChild > 0) { bool ignore = true; SWinKey winRes = { .ts = nextWin.skey, @@ -2425,8 +2364,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { - int32_t size = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, &winRes, size); + addPullWindow(pInfo->pPullDataMap, &winRes, pInfo->numOfChild); } } else { int32_t index = -1; @@ -2780,24 +2718,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, streamStateSetNumber(pInfo->pState, -1); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->pChildren = NULL; - if (numOfChild > 0) { - pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); - if (!pInfo->pChildren) { - goto _error; - } - for (int32_t i = 0; i < numOfChild; i++) { - SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); - if (pChildOp) { - SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; - pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - taosArrayPush(pInfo->pChildren, &pChildOp); - streamStateSetNumber(pChInfo->pState, i); - continue; - } - goto _error; - } - } + pInfo->numOfChild = numOfChild; pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); @@ -3337,13 +3258,13 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; int32_t numOfOutput = pSup->numOfExprs; - int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + int32_t numOfChild = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SSessionKey* pWinKey = taosArrayGet(pWinArray, i); int32_t num = 0; SResultWindowInfo parentWin = {0}; - for (int32_t j = 0; j < numOfChildren; j++) { + for (int32_t j = 0; j < numOfChild; j++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; @@ -4995,7 +4916,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pullIndex = 0; pInfo->pPullDataRes = NULL; pInfo->isFinal = false; - pInfo->pChildren = NULL; + pInfo->numOfChild = 0; pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0;