From 4979fc5283607a617d71dbfb8935e0c1ea2b42d2 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 22 Sep 2023 09:40:08 +0800 Subject: [PATCH] fix issue --- include/libs/stream/tstreamFileState.h | 2 + .../executor/src/streamtimewindowoperator.c | 40 ++++++-- source/libs/stream/src/streamSessionState.c | 24 +++-- source/libs/stream/src/streamState.c | 3 +- source/libs/stream/src/tstreamFileState.c | 95 ++++++++++++------- 5 files changed, 113 insertions(+), 51 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 70199731e5..36f4274faa 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -34,6 +34,7 @@ typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t k typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); +typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num); typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey); typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); @@ -72,6 +73,7 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState); // session window int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); +int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen); void sessionWinStateClear(SStreamFileState* pFileState); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index aa9ba4e4b0..05391df671 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2595,11 +2595,15 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { void streamSessionReleaseState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + int32_t resSize = winSize + sizeof(TSKEY); + char* pBuff = taosMemoryCalloc(1, resSize); + memcpy(pBuff, pInfo->historyWins->pData, winSize); + memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, - resSize); + strlen(STREAM_SESSION_OP_STATE_NAME), pBuff, resSize); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + taosMemoryFreeClear(pBuff); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -2621,16 +2625,19 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); + int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; - ASSERT(size == num * sizeof(SSessionKey)); + ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); compactSessionSemiWindow(pOperator, &winInfo); saveSessionOutputBuf(pAggSup, &winInfo); } + TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); taosMemoryFree(pBuf); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { @@ -2638,26 +2645,41 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { } } +void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) { + int32_t rowSize = pAggSup->resultRowSize; + int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize); + if (code == TSDB_CODE_SUCCESS) { + pWinInfo->sessionWin = *pKey; + pWinInfo->isOutput = true; + } else { + SET_SESSION_WIN_INVALID((*pWinInfo)); + } +} + void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); - SResultWindowInfo winInfo = {0}; int32_t size = 0; void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); + int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; - ASSERT(size == num * sizeof(SSessionKey)); + ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); + + TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts); + if (!pInfo->pStUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); } for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; - setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); + getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 344c3845af..8bb0de6a1a 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -135,16 +135,15 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, if (index + 1 == 0) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { - int32_t len = 0; void* p = NULL; void* pFileStore = getStateFileStore(pFileState); - int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, &len); + int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); pNewPos->needFree = true; qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); if (code == TSDB_CODE_SUCCESS) { - memcpy(pNewPos->pRowBuff, p, len); + memcpy(pNewPos->pRowBuff, p, *pVLen); } taosMemoryFree(p); (*pVal) = pNewPos; @@ -162,6 +161,20 @@ _end: return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; } +int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + pNewPos->needFree = true; + void* pBuff = NULL; + int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + memcpy(pNewPos->pRowBuff, pBuff, *pVLen); + taosMemoryFreeClear(pBuff); + (*pVal) = pNewPos; + return TSDB_CODE_SUCCESS; +} + int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) { SSHashObj* pSessionBuff = (SSHashObj*) pBuff; SSessionKey* pWinKey = (SSessionKey*) key; @@ -446,16 +459,15 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (index + 1 == 0) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { - int32_t len = 0; void* p = NULL; void* pFileStore = getStateFileStore(pFileState); - int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, &len); + int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); pNewPos->needFree = true; qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); if (code == TSDB_CODE_SUCCESS) { - memcpy(pNewPos->pRowBuff, p, len); + memcpy(pNewPos->pRowBuff, p, *pVLen); } taosMemoryFree(p); (*pVal) = pNewPos; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4c65f403a9..68ba8890ce 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -720,8 +720,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - ASSERT(0); - return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen); + return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen); #else SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b1318896e0..a13cb33042 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -28,29 +28,31 @@ #define MIN_NUM_OF_ROW_BUFF 10240 struct SStreamFileState { - SList* usedBuffs; - SList* freeBuffs; - void* rowStateBuff; - void* pFileStore; - int32_t rowSize; - int32_t selectivityRowSize; - int32_t keyLen; - uint64_t preCheckPointVersion; - uint64_t checkPointVersion; - TSKEY maxTs; - TSKEY deleteMark; - TSKEY flushMark; - uint64_t maxRowCount; - uint64_t curRowCount; - GetTsFun getTs; - char* id; + SList* usedBuffs; + SList* freeBuffs; + void* rowStateBuff; + void* pFileStore; + int32_t rowSize; + int32_t selectivityRowSize; + int32_t keyLen; + uint64_t preCheckPointVersion; + uint64_t checkPointVersion; + TSKEY maxTs; + TSKEY deleteMark; + TSKEY flushMark; + uint64_t maxRowCount; + uint64_t curRowCount; + GetTsFun getTs; + char* id; + char* cfName; - _state_buff_cleanup_fn stateBuffCleanupFn; - _state_buff_remove_fn stateBuffRemoveFn; + _state_buff_cleanup_fn stateBuffCleanupFn; + _state_buff_remove_fn stateBuffRemoveFn; + _state_buff_create_statekeyfn stateBuffCreateStateKeyFn; - _state_file_remove_fn stateFileRemoveFn; - _state_file_get_fn stateFileGetFn; - _state_file_clear_fn stateFileClearFn; + _state_file_remove_fn stateFileRemoveFn; + _state_file_get_fn stateFileGetFn; + _state_file_clear_fn stateFileClearFn; }; typedef SRowBuffPos SRowBuffInfo; @@ -75,6 +77,14 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen); } +void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) { + SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey)); + SWinKey* pWinKey = pPos->pKey; + pStateKey->key = *pWinKey; + pStateKey->opNum = num; + return pStateKey; +} + int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey); } @@ -83,6 +93,14 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen); } +void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) { + SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey)); + SSessionKey* pWinKey = pPos->pKey; + pStateKey->key = *pWinKey; + pStateKey->opNum = num; + return pStateKey; +} + SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type) { @@ -107,18 +125,22 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; + pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey; pFileState->stateFileRemoveFn = intervalFileRemoveFn; pFileState->stateFileGetFn = intervalFileGetFn; pFileState->stateFileClearFn = streamStateClear_rocksdb; + pFileState->cfName = taosStrdup("state"); } else { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; pFileState->stateBuffRemoveFn = deleteSessionWinStateBuff; + pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey; pFileState->stateFileRemoveFn = sessionFileRemoveFn; pFileState->stateFileGetFn = sessionFileGetFn; pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; + pFileState->cfName = taosStrdup("sess"); } if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { @@ -183,6 +205,7 @@ void streamFileStateDestroy(SStreamFileState* pFileState) { } taosMemoryFree(pFileState->id); + taosMemoryFree(pFileState->cfName); tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr); tdListFreeP(pFileState->freeBuffs, destroyRowBuff); pFileState->stateBuffCleanupFn(pFileState->rowStateBuff); @@ -226,9 +249,6 @@ void streamFileStateClear(SStreamFileState* pFileState) { bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; } void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { - if (pPos->needFree) { - putFreeBuff(pFileState, pPos); - } pPos->beUsed = used; } @@ -391,8 +411,19 @@ int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t ke return TSDB_CODE_FAILED; } +static void recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { + int32_t len = 0; + void* pBuff = NULL; + pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len); + memcpy(pPos->pRowBuff, pBuff, len); + taosMemoryFree(pBuff); +} + int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) { if (pPos->pRowBuff) { + if (pPos->needFree) { + recoverSessionRowBuff(pFileState, pPos); + } (*pVal) = pPos->pRowBuff; return TSDB_CODE_SUCCESS; } @@ -405,11 +436,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** ASSERT(pPos->pRowBuff); } - int32_t len = 0; - void* pBuff = NULL; - pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len); - memcpy(pPos->pRowBuff, pBuff, len); - taosMemoryFree(pBuff); + recoverSessionRowBuff(pFileState, pPos); (*pVal) = pPos->pRowBuff; tdListPrepend(pFileState->usedBuffs, &pPos); return TSDB_CODE_SUCCESS; @@ -452,7 +479,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t numOfElems = listNEles(pSnapshot); SListNode* pNode = NULL; - int idx = streamStateGetCfIdx(pFileState->pFileStore, "state"); + int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; char* buf = taosMemoryCalloc(1, len); @@ -471,12 +498,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStateClearBatch(batch); } - SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; - code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, &sKey, pPos->pRowBuff, pFileState->rowSize, + void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number); + code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize, 0, buf); + taosMemoryFreeClear(pSKey); // todo handle failure memset(buf, 0, len); - // qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } taosMemoryFree(buf); @@ -637,7 +664,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { } bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) { - return ts < pFileState->flushMark; + return ts <= pFileState->flushMark; } int32_t getRowStateRowSize(SStreamFileState* pFileState) {