stream count window check point

This commit is contained in:
54liuyao 2024-02-05 15:10:03 +08:00
parent b16fc7d9df
commit 08cb46ac3e
2 changed files with 26 additions and 12 deletions

View File

@ -220,7 +220,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
blockDataEnsureCapacity(pAggSup->pScanBlock, rows); blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
SStreamStateCur* pCur = NULL; SStreamStateCur* pCur = NULL;
COUNT_TYPE slidingRows = 0; COUNT_TYPE slidingRows = 0;

View File

@ -103,7 +103,12 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true; pNewPos->needFree = true;
pNewPos->beFlushed = true; pNewPos->beFlushed = true;
if(p) {
memcpy(pNewPos->pRowBuff, p, *pVLen); memcpy(pNewPos->pRowBuff, p, *pVLen);
} else {
int32_t len = getRowStateRowSize(pFileState);
memset(pNewPos->pRowBuff, 0, len);
}
taosMemoryFree(p); taosMemoryFree(p);
return pNewPos; return pNewPos;
} }
@ -728,20 +733,29 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
int32_t size = taosArrayGetSize(pWinStates); int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) { if (size == 0) {
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
void* p = NULL; void* pRockVal = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
code = code_file; if (code == TSDB_CODE_SUCCESS) {
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); int32_t valSize = *pVLen;
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
streamStateFreeCur(pCur);
goto _end;
}
}
pWinKey->win.skey = startTs;
pWinKey->win.ekey = endTs;
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
taosMemoryFree(pRockVal);
streamStateFreeCur(pCur);
} else { } else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
taosMemoryFree(p);
} }
streamStateFreeCur(pCur);
goto _end; goto _end;
} }