diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 4a1a6aa2ad..1045480e7e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -220,7 +220,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; - blockDataEnsureCapacity(pAggSup->pScanBlock, rows); + blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2); SStreamStateCur* pCur = NULL; COUNT_TYPE slidingRows = 0; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 9d5a594f46..69a4a6b5a3 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -103,7 +103,12 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; - memcpy(pNewPos->pRowBuff, p, *pVLen); + if(p) { + memcpy(pNewPos->pRowBuff, p, *pVLen); + } else { + int32_t len = getRowStateRowSize(pFileState); + memset(pNewPos->pRowBuff, 0, len); + } taosMemoryFree(p); return pNewPos; } @@ -727,21 +732,30 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { - void* pFileStore = getStateFileStore(pFileState); - void* p = NULL; - + void* pFileStore = getStateFileStore(pFileState); + void* pRockVal = NULL; SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); - if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { - (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); - code = code_file; - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); + if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); + if (code == TSDB_CODE_SUCCESS) { + int32_t valSize = *pVLen; + COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); + if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { + (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + streamStateFreeCur(pCur); + goto _end; + } + } + pWinKey->win.skey = startTs; + pWinKey->win.ekey = endTs; + (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); + taosMemoryFree(pRockVal); + streamStateFreeCur(pCur); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); code = TSDB_CODE_FAILED; - taosMemoryFree(p); } - streamStateFreeCur(pCur); goto _end; }