diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 050e67e15d..857d048457 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -677,6 +677,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pCountNode->window.watermark, + .calTrigger = pCountNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pCountNode->window, 0), + }; + pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, @@ -687,13 +695,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->streamAggSup.windowCount = pCountNode->windowCount; pInfo->streamAggSup.windowSliding = pCountNode->windowSliding; - pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pCountNode->window.watermark, - .calTrigger = pCountNode->window.triggerType, - .maxTs = INT64_MIN, - .minTs = INT64_MAX, - }; - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->binfo.pRes = pResBlock; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 687b4bcf12..4c61e6da1d 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -734,6 +734,21 @@ _end: return code; } +int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + streamStateFreeCur(pCur); + if (code == TSDB_CODE_SUCCESS) { + return code; + } else { + pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey); + } + + code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + streamStateFreeCur(pCur); + return code; +} + int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { SSessionKey* pWinKey = pKey; const TSKEY gap = 0; @@ -755,14 +770,13 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C if (size == 0) { void* pFileStore = getStateFileStore(pFileState); void* pRockVal = NULL; - SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); - streamStateFreeCur(pCur); + code = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen); if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code); if (code == TSDB_CODE_SUCCESS) { int32_t valSize = *pVLen; - COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); + COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE))); if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); goto _end; @@ -798,20 +812,24 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C } if (index == -1) { - if (!isDeteled(pFileState, endTs)) { - void* p = NULL; - void* pFileStore = getStateFileStore(pFileState); - SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); + if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) { + SSessionKey tmpKey = *pWinKey; + void* pRockVal = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen); if (code_file == TSDB_CODE_SUCCESS) { - (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); - code = code_file; - qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); - streamStateFreeCur(pCur); - goto _end; + SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0); + SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey; + if (tmpKey.win.ekey < pFirstWinKey->win.skey) { + *pWinKey = tmpKey; + (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + code = code_file; + qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code_file); + goto _end; + } } - taosMemoryFree(p); - streamStateFreeCur(pCur); + taosMemoryFree(pRockVal); } }