From 1aaf8731340c780f04de4af879b812f0596ce7fc Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Sat, 7 Oct 2023 17:43:50 +0800 Subject: [PATCH] fix issue --- include/libs/stream/tstreamFileState.h | 8 +- .../executor/src/streamtimewindowoperator.c | 73 +++++++++++++++---- source/libs/stream/src/streamSessionState.c | 27 ++++++- source/libs/stream/src/tstreamFileState.c | 51 ++++++++----- 4 files changed, 122 insertions(+), 37 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index cc3d574a7f..2b567a7370 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -32,9 +32,10 @@ typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); -typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen, bool invalid); +typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); +typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); -typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num); +typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num); typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey); typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); @@ -75,7 +76,8 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState); int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); -int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid); +int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen); +int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateCleanup(void* pBuff); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index b3842243ed..f206ac5166 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2642,6 +2642,20 @@ void resetWinRange(STimeWindow* winRange) { winRange->ekey = INT64_MAX; } +void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) { + int32_t rowSize = pAggSup->resultRowSize; + int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize); + if (code == TSDB_CODE_SUCCESS) { + pWinInfo->sessionWin = *pKey; + pWinInfo->isOutput = true; + if (pWinInfo->pStatePos->needFree) { + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pWinInfo->sessionWin); + } + } else { + SET_SESSION_WIN_INVALID((*pWinInfo)); + } +} + void streamSessionSemiReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -2657,7 +2671,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; - setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); + getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); compactSessionSemiWindow(pOperator, &winInfo); saveSessionOutputBuf(pAggSup, &winInfo); } @@ -2672,17 +2686,6 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { } } -void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) { - int32_t rowSize = pAggSup->resultRowSize; - int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize); - if (code == TSDB_CODE_SUCCESS) { - pWinInfo->sessionWin = *pKey; - pWinInfo->isOutput = true; - } else { - SET_SESSION_WIN_INVALID((*pWinInfo)); - } -} - void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -3061,6 +3064,50 @@ bool compareWinStateKey(SStateKeys* left, SStateKeys* right) { return compareVal(left->pData, right); } +void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SStateWindowInfo* pCurWin, + SStateWindowInfo* pNextWin) { + int32_t size = pAggSup->resultRowSize; + pCurWin->winInfo.sessionWin.groupId = pKey->groupId; + pCurWin->winInfo.sessionWin.win.skey = pKey->win.skey; + pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey; + getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo); + ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo)); + pCurWin->pStateKey = + (SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pCurWin->pStateKey->type = pAggSup->stateKeyType; + pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); + pCurWin->pStateKey->isNull = false; + pCurWin->winInfo.isOutput = true; + if (pCurWin->winInfo.pStatePos->needFree) { + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); + } + + qDebug("===stream===get state cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, + pCurWin->winInfo.sessionWin.win.ekey); + + pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; + SStreamStateCur* pCur = + pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); + int32_t nextSize = pAggSup->resultRowSize; + int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, + (void**)&pNextWin->winInfo.pStatePos, &nextSize); + if (code != TSDB_CODE_SUCCESS) { + SET_SESSION_WIN_INVALID(pNextWin->winInfo); + } else { + pNextWin->pStateKey = + (SStateKeys*)((char*)pNextWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pNextWin->pStateKey->type = pAggSup->stateKeyType; + pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys); + pNextWin->pStateKey->isNull = false; + pNextWin->winInfo.isOutput = true; + } + pAggSup->stateStore.streamStateFreeCur(pCur); + qDebug("===stream===get state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey, + pNextWin->winInfo.sessionWin.win.ekey); +} + void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData, SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) { int32_t size = pAggSup->resultRowSize; @@ -3535,7 +3582,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { SStateWindowInfo dummy = {0}; qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i); - setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); + getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo); bool cpRes = compareWinStateKey(curInfo.pStateKey, nextInfo.pStateKey); qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index c4207e255c..8aedc41625 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -228,7 +228,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v return TSDB_CODE_SUCCESS; } -int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid) { +int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) { SSHashObj* pSessionBuff = (SSHashObj*) pBuff; SSessionKey* pWinKey = (SSessionKey*) key; void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); @@ -242,9 +242,28 @@ int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, b if (index >= 0) { SRowBuffPos* pPos = taosArrayGetP(pWinStates, index); if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) { - if (invalid) { - pPos->beFlushed = true; - } + pPos->beFlushed = true; + taosArrayRemove(pWinStates, index); + } + } + return TSDB_CODE_SUCCESS; +} + +int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SSessionKey* pWinKey = (SSessionKey*) pPos->pKey; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + if (!ppBuff) { + return TSDB_CODE_SUCCESS; + } + SArray* pWinStates = (SArray*)(*ppBuff); + int32_t size = taosArrayGetSize(pWinStates); + TSKEY gap = 0; + int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); + if (index >= 0) { + SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index); + if (pItemPos == pPos) { + pItemPos->beFlushed = true; taosArrayRemove(pWinStates, index); } } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index daad14dcfc..4b6ea07047 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -46,9 +46,10 @@ struct SStreamFileState { char* id; char* cfName; - _state_buff_cleanup_fn stateBuffCleanupFn; - _state_buff_remove_fn stateBuffRemoveFn; - _state_buff_create_statekeyfn stateBuffCreateStateKeyFn; + _state_buff_cleanup_fn stateBuffCleanupFn; + _state_buff_remove_fn stateBuffRemoveFn; + _state_buff_remove_by_pos_fn stateBuffRemoveByPosFn; + _state_buff_create_statekey_fn stateBuffCreateStateKeyFn; _state_file_remove_fn stateFileRemoveFn; _state_file_get_fn stateFileGetFn; @@ -57,16 +58,25 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen, bool invalid) { - if (invalid) { - SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen); - if (pos) { - (*pos)->beFlushed = true; - } +int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { + SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen); + if (pos) { + (*pos)->beFlushed = true; } return tSimpleHashRemove(pBuff, pKey, keyLen); } +int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { + size_t keyLen = pFileState->rowSize; + SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen); + if (ppPos) { + if ((*ppPos) == pPos) { + return tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen); + } + } + return TSDB_CODE_SUCCESS; +} + void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); } @@ -131,6 +141,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; + pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn; pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey; pFileState->stateFileRemoveFn = intervalFileRemoveFn; @@ -140,7 +151,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ } else { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; - pFileState->stateBuffRemoveFn = deleteSessionWinStateBuff; + pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn; + pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn; pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey; pFileState->stateFileRemoveFn = sessionFileRemoveFn; @@ -236,7 +248,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { putFreeBuff(pFileState, pPos); if (!all) { - pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false); + pFileState->stateBuffRemoveByPosFn(pFileState, pPos); } destroyRowBuffPos(pPos); tdListPopNode(pFileState->usedBuffs, pNode); @@ -256,7 +268,7 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) { tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); - pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false); + pFileState->stateBuffRemoveByPosFn(pFileState, pPos); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); if (pPos->pRowBuff) { @@ -290,7 +302,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin if (pPos->beUsed == used) { tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); - pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false); + pFileState->stateBuffRemoveByPosFn(pFileState, pPos); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); if (pPos->pRowBuff) { @@ -433,7 +445,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi } int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { - int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen, true); + int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { return TSDB_CODE_SUCCESS; @@ -460,9 +472,14 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pPos->pRowBuff = getFreeBuff(pFileState); if (!pPos->pRowBuff) { - int32_t code = clearRowBuff(pFileState); - ASSERT(code == 0); - pPos->pRowBuff = getFreeBuff(pFileState); + if (pFileState->curRowCount < pFileState->maxRowCount) { + pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize); + pFileState->curRowCount++; + } else { + int32_t code = clearRowBuff(pFileState); + ASSERT(code == 0); + pPos->pRowBuff = getFreeBuff(pFileState); + } ASSERT(pPos->pRowBuff); }