diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f4713f7a6f..3bef15f3a7 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -56,7 +56,8 @@ typedef struct { void* pStateBackend; struct SStorageAPI api; - int8_t fillHistory; + int8_t fillHistory; + STimeWindow winRange; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 761def6dc6..250c94c2f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -826,7 +826,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory}; + SReadHandle handle = {.vnode = pTq->pVnode, + .initTqReader = 1, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window}; initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); @@ -849,7 +853,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); - SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory}; + SReadHandle handle = {.vnode = NULL, + .numOfVgroups = numOfVgroups, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window}; initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 522f648ccc..0ba9aae133 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -285,6 +285,8 @@ typedef struct SStreamAggSupporter { int16_t stateKeyType; SDiskbasedBuf* pResultBuf; SStateStore stateStore; + STimeWindow winRange; + SStorageAPI* pSessionAPI; } SStreamAggSupporter; typedef struct SWindowSupporter { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 13d24aa531..c4111ded92 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2966,7 +2966,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, - SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore) { + SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) { pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -3008,6 +3008,16 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, pCtx[i].saveHandle.pBuf = pSup->pResultBuf; } + if (pHandle) { + pSup->winRange = pHandle->winRange; + // temporary + if (pSup->winRange.ekey <= 0) { + pSup->winRange.ekey = INT64_MAX; + } + } + + pSup->pSessionAPI = pApi; + return TSDB_CODE_SUCCESS; } @@ -3035,6 +3045,13 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; } +bool inWinRange(STimeWindow* range, STimeWindow* cur) { + if (cur->skey >= range->skey && cur->ekey <= range->ekey) { + return true; + } + return false; +} + void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SResultWindowInfo* pCurWin) { pCurWin->sessionWin.groupId = groupId; @@ -3043,6 +3060,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT int32_t size = pAggSup->resultRowSize; int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size); + if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) { + code = TSDB_CODE_FAILED; + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore); + pCurWin->pOutputBuf = taosMemoryMalloc(size); + } + if (code == TSDB_CODE_SUCCESS) { pCurWin->isOutput = true; } else { @@ -3189,7 +3212,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC while (1) { SResultWindowInfo winInfo = {0}; SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo); - if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) { + if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) || + !inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) { taosMemoryFree(winInfo.pOutputBuf); pAPI->stateStore.streamStateFreeCur(pCur); break; @@ -3413,8 +3437,12 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SResultWindowInfo childWin = {0}; childWin.sessionWin = *pWinKey; int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin); - if (code == TSDB_CODE_SUCCESS && pWinKey->win.skey <= childWin.sessionWin.win.skey && - childWin.sessionWin.win.ekey <= pWinKey->win.ekey) { + + if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) { + continue; + } + + if (code == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) { if (num == 0) { setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin); code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); @@ -3678,9 +3706,16 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { } } +void resetWinRange(STimeWindow* winRange) { + winRange->skey = INT16_MIN; + winRange->skey = INT16_MAX; +} + void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + resetWinRange(&pAggSup->winRange); + SResultWindowInfo winInfo = {0}; int32_t size = 0; void* pBuf = NULL; @@ -3734,7 +3769,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh } code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, - pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore); + pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4024,6 +4059,12 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); pCurWin->pStateKey->isNull = false; + if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { + code = TSDB_CODE_FAILED; + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size); + } + if (code == TSDB_CODE_SUCCESS) { pCurWin->winInfo.isOutput = true; } else if (pKeyData) { @@ -4292,6 +4333,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur void streamStateReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + resetWinRange(&pAggSup->winRange); + SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; int32_t size = 0; void* pBuf = NULL; @@ -4361,7 +4404,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, - type, &pTaskInfo->storageAPI.stateStore); + type, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { goto _error; }