diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 2b567a7370..c1974df7de 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -79,6 +79,9 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen); int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); +SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen); +int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId); + void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateCleanup(void* pBuff); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index c9da3c99e7..6fc862b438 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3590,7 +3590,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; - SStateWindowInfo dummy = {0}; qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i); getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ac404893f0..584e81fafc 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -177,7 +177,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { recoverSnapshot(pFileState, checkpointId); + } else { + recoverSesssion(pFileState, checkpointId); } + return pFileState; _error: @@ -642,12 +645,24 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { } int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { - int code = TSDB_CODE_SUCCESS; + int code = TSDB_CODE_SUCCESS; + if (pFileState->maxTs != INT64_MIN) { + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) + ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + deleteExpiredCheckPoint(pFileState, mark); + } + SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore); if (pCur == NULL) { return -1; } + int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); while (code == TSDB_CODE_SUCCESS) { + if (pFileState->curRowCount >= recoverNum) { + break; + } + void* pVal = NULL; int32_t vlen = 0; SSessionKey key = {0}; @@ -655,12 +670,14 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { if (code != 0) { break; } - taosMemoryFree(pVal); + SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen); + putSessionWinResultBuff(pFileState, pPos); code = streamStateSessionCurPrev_rocksdb(pCur); } streamStateFreeCur(pCur); return code; } + int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; if (pFileState->maxTs != INT64_MIN) { @@ -674,11 +691,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { if (pCur == NULL) { return -1; } - + int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); while (code == TSDB_CODE_SUCCESS) { - if (pFileState->curRowCount == pFileState->maxRowCount) { + if (pFileState->curRowCount >= recoverNum) { break; } + void* pVal = NULL; int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);