From eaf10fcb4360267392a4d0b25510a14ed38687e8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Feb 2024 18:37:24 +0800 Subject: [PATCH] count window fill history --- .../libs/executor/src/countwindowoperator.c | 16 +++++++++++- .../executor/src/streamcountwindowoperator.c | 26 ++++++++++++++++--- source/libs/stream/src/streamSessionState.c | 4 +-- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index ffed6a7788..8a8f43a5e7 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -64,12 +64,26 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SWindowRowsSup* pRowSup = &pInfo->winSup; int32_t rowIndex = 0; int32_t code = TSDB_CODE_SUCCESS; + int32_t step = 0; - for (int32_t i = 0; i < pBlock->info.rows; i++) { + for (int32_t i = 0; i < pBlock->info.rows;) { // todo(liuyao) 1.如果group id发生变化,获取新group id上一次的window的缓存,并把旧group id的信息存入缓存。 + // 没有sliding + // 只需要一个缓存即可 + // 1.如果group id发生变化,说明本group窗口全部结束,输出上次的缓存(这里需要判断缓存中是否有数据) + // 设置缓存 // 2.计算 当前需要合并的行数 // 3.做聚集计算。 // 4.达到行数,将结果存入pInfo->res中。 + + // 有sliding + // 缓存是一个队列 + // 1.如果group id发生变化,说明本group窗口全部结束,输出上次的缓存(这里需要判断缓存中是否有数据,可能输出多行) + // pInfo记录队列的起始位置 + // 2.计算 当前需要合并的行数 + // 3.做聚集计算。 + // 4.达到行数(pInfo记录队列的起始位置后移),将结果存入pInfo->res中。 + i += step; } return code; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 1045480e7e..440cfe67d8 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -117,6 +117,7 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStDeleted, bool* pRebuild) { + SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; int32_t num = 0; for (int32_t i = start; i < rows; i++) { if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) { @@ -148,7 +149,7 @@ int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pW if (needDelState) { memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); if (pWinInfo->winInfo.pStatePos->needFree) { - pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pWinInfo->winInfo.sessionWin); + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey); } } @@ -576,7 +577,15 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { } void streamCountReleaseState(SOperatorInfo* pOperator) { - //nothing + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + int32_t resSize = sizeof(TSKEY); + char* pBuff = taosMemoryCalloc(1, resSize); + memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); + qDebug("===stream=== count window operator relase state. "); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME, + strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + taosMemoryFreeClear(pBuff); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -584,11 +593,22 @@ void streamCountReleaseState(SOperatorInfo* pOperator) { } void streamCountReloadState(SOperatorInfo* pOperator) { - // nothing + SStreamCountAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + int32_t size = 0; + void* pBuf = NULL; + + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME, + strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size); + TSKEY ts = *(TSKEY*)pBuf; + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + taosMemoryFree(pBuf); + SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 69a4a6b5a3..bd28d2bca9 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -786,10 +786,10 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + streamStateFreeCur(pCur); goto _end; - } else { - taosMemoryFree(p); } + taosMemoryFree(p); streamStateFreeCur(pCur); } }