From 84817255179df77c6d4f6dfa3d9bcff6952eff2e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 19 Jul 2023 14:58:28 +0800 Subject: [PATCH] remove redundant results --- source/libs/executor/src/timewindowoperator.c | 57 +++++++++++-------- source/libs/stream/src/streamBackendRocksdb.c | 5 -- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 51da73d4d7..5d1fc0530f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2914,6 +2914,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pWinBlock); blockDataDestroy(pInfo->pUpdateRes); + tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); taosArrayDestroy(pInfo->historyWins); @@ -3008,14 +3009,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, pCtx[i].saveHandle.pBuf = pSup->pResultBuf; } - if (pHandle) { - pSup->winRange = pHandle->winRange; - // temporary - if (pSup->winRange.ekey <= 0) { - pSup->winRange.ekey = INT64_MAX; - } - } - pSup->pSessionAPI = pApi; return TSDB_CODE_SUCCESS; @@ -3063,11 +3056,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) { code = TSDB_CODE_FAILED; releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore); - pCurWin->pOutputBuf = taosMemoryMalloc(size); + pCurWin->pOutputBuf = taosMemoryCalloc(1, size); } if (code == TSDB_CODE_SUCCESS) { pCurWin->isOutput = true; + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin); } else { pCurWin->sessionWin.win.skey = startTs; pCurWin->sessionWin.win.ekey = endTs; @@ -3198,7 +3192,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* } static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, - SSHashObj* pStDeleted) { + SSHashObj* pStDeleted, bool addGap) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -3222,7 +3216,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); int64_t winDelta = 0; - if (IS_FINAL_OP(pInfo)) { + if (addGap) { winDelta = pAggSup->gap; } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta); @@ -3240,6 +3234,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); + pWinInfo->pOutputBuf = NULL; return TSDB_CODE_SUCCESS; } @@ -3253,8 +3248,10 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SResultRow* pResult = NULL; int32_t rows = pSDataBlock->info.rows; int32_t winRows = 0; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; @@ -3266,7 +3263,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; for (int32_t i = 0; i < rows;) { if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) { i++; @@ -3291,7 +3287,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted); + compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap); saveSessionOutputBuf(pAggSup, &winInfo); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { @@ -3455,7 +3451,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, pChild->exprSupp.rowEntryInfoOffset); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); - compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL); + compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true); saveResult(parentWin, pStUpdated); } else { break; @@ -3707,8 +3703,8 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { } void resetWinRange(STimeWindow* winRange) { - winRange->skey = INT16_MIN; - winRange->skey = INT16_MAX; + winRange->skey = INT64_MIN; + winRange->ekey = INT64_MAX; } void streamSessionReloadState(SOperatorInfo* pOperator) { @@ -3724,10 +3720,16 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { int32_t num = size / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; ASSERT(size == num * sizeof(SSessionKey)); + if (!pInfo->pStUpdated && num > 0) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + } for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); - compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted); + compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); + saveSessionOutputBuf(pAggSup, &winInfo); + saveResult(winInfo, pInfo->pStUpdated); } taosMemoryFree(pBuf); @@ -4019,6 +4021,7 @@ void destroyStreamStateOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); taosArrayDestroy(pInfo->historyWins); + tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); taosMemoryFreeClear(param); } @@ -4073,6 +4076,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS) { pCurWin->winInfo.isOutput = true; + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); } else if (pKeyData) { if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) { varDataCopy(pCurWin->pStateKey->pData, pKeyData); @@ -4145,8 +4149,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl TSKEY* tsCols = NULL; SResultRow* pResult = NULL; int32_t winRows = 0; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); @@ -4155,7 +4161,6 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl return; } - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; int32_t rows = pSDataBlock->info.rows; blockDataEnsureCapacity(pAggSup->pScanBlock, rows); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); @@ -4335,7 +4340,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur SResultRow* pWinResult = NULL; initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1); compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); if (pNextWin->isOutput && pStDeleted) { @@ -4344,11 +4349,10 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); taosMemoryFree(pNextWin->pOutputBuf); - saveSessionOutputBuf(pAggSup, pCurWin); } void streamStateReloadState(SOperatorInfo* pOperator) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); @@ -4360,14 +4364,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) { int32_t num = size / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; ASSERT(size == num * sizeof(SSessionKey)); + if (!pInfo->pSeUpdated && num > 0) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + } for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; SStateWindowInfo dummy = {0}; setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { - compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); - saveResult(curInfo.winInfo, pInfo->pStUpdated); + compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); + saveSessionOutputBuf(pAggSup, &curInfo.winInfo); + saveResult(curInfo.winInfo, pInfo->pSeUpdated); } if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { @@ -4855,7 +4864,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup); } - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 18ec80e87a..ffefdb0003 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1864,7 +1864,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); taosMemoryFreeClear(*pVal); - streamStateSessionDel_rocksdb(pState, key); goto _end; } taosMemoryFreeClear(*pVal); @@ -1880,7 +1879,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); - streamStateSessionDel_rocksdb(pState, key); goto _end; } } @@ -1938,14 +1936,12 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); - streamStateSessionDel_rocksdb(pState, key); goto _end; } void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); - streamStateSessionDel_rocksdb(pState, key); goto _end; } @@ -1961,7 +1957,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); - streamStateSessionDel_rocksdb(pState, key); goto _end; } }