diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 88716587e1..b3842243ed 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3456,12 +3456,17 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { void streamStateReleaseState(SOperatorInfo* pOperator) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + int32_t resSize = winSize + sizeof(TSKEY); + char* pBuff = taosMemoryCalloc(1, resSize); + memcpy(pBuff, pInfo->historyWins->pData, winSize); + memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, - strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, - resSize); + strlen(STREAM_STATE_OP_STATE_NAME), pBuff, resSize); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + taosMemoryFreeClear(pBuff); + SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -3507,10 +3512,15 @@ void streamStateReloadState(SOperatorInfo* pOperator) { void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); + int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); qDebug("===stream=== reload state. get result count:%d", num); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey)); + ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); + + TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts); + if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index a4e06488d5..370a8f5224 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -88,6 +88,15 @@ static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* return pNewPos; } +SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); + pNewPos->needFree = true; + memcpy(pNewPos->pRowBuff, p, *pVLen); + taosMemoryFree(p); + return pNewPos; +} + int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) { int32_t code = TSDB_CODE_SUCCESS; SSHashObj* pSessionBuff = getRowStateBuff(pFileState); @@ -105,8 +114,17 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { - (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); - code = TSDB_CODE_FAILED; + void* pFileStore = getStateFileStore(pFileState); + void* p = NULL; + int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); + if (code_file == TSDB_CODE_SUCCESS) { + (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); + code = code_file; + qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); + } else { + (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); + code = TSDB_CODE_FAILED; + } goto _end; } @@ -140,21 +158,14 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) { void* p = NULL; void* pFileStore = getStateFileStore(pFileState); - SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { - memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); - pNewPos->needFree = true; - - qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); - memcpy(pNewPos->pRowBuff, p, *pVLen); - taosMemoryFree(p); - (*pVal) = pNewPos; + (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); code = code_file; + qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); goto _end; } else { taosMemoryFree(p); - streamFileStateReleaseBuff(pFileState, pNewPos, false); } } } @@ -479,8 +490,17 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { - (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); - code = TSDB_CODE_FAILED; + void* pFileStore = getStateFileStore(pFileState); + void* p = NULL; + int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); + if (code_file == TSDB_CODE_SUCCESS) { + (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); + code = code_file; + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + } else { + (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + code = TSDB_CODE_FAILED; + } goto _end; } @@ -515,22 +535,17 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (index + 1 == 0) { if (!isDeteled(pFileState, endTs)) { - void* p = NULL; - void* pFileStore = getStateFileStore(pFileState); - SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - pNewPos->needFree = true; - int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); + void* p = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code_file = + streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { - memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); - qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); - memcpy(pNewPos->pRowBuff, p, *pVLen); - (*pVal) = pNewPos; - taosMemoryFree(p); + (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; + qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); goto _end; } else { taosMemoryFree(p); - streamFileStateReleaseBuff(pFileState, pNewPos, false); } } }