From 4e6b898c9a0f31286498c3a7fdc6efb11e77b5a5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 18 Jul 2024 10:37:14 +0800 Subject: [PATCH] adj stream operator result --- include/libs/executor/storageapi.h | 2 +- include/libs/stream/streamState.h | 2 +- include/libs/stream/tstreamFileState.h | 5 +- source/libs/executor/inc/executorInt.h | 2 +- .../executor/src/streameventwindowoperator.c | 4 +- .../executor/src/streamtimewindowoperator.c | 53 +++++++++++++------ source/libs/stream/src/streamSessionState.c | 8 +-- source/libs/stream/src/streamState.c | 19 +++++-- source/libs/stream/src/tstreamFileState.c | 29 ++++++++-- 9 files changed, 90 insertions(+), 34 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index e91b7c71ea..8afe55b0a1 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -367,7 +367,7 @@ typedef struct SStateStore { int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen, int32_t* pWinCode); int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); - int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); + int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); void (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key); void (*streamStateSessionReset)(SStreamState* pState, void* pVal); void (*streamStateSessionClear)(SStreamState* pState); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 06da578749..46874b7c65 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -54,7 +54,7 @@ int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, voi int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen, int32_t* pWinCode); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); -int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); void streamStateSessionDel(SStreamState* pState, const SSessionKey* key); void streamStateSessionReset(SStreamState* pState, void* pVal); void streamStateSessionClear(SStreamState* pState); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 699e761f5a..8c005fa994 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -59,7 +59,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); -void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); +int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); @@ -81,7 +81,8 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState); int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen, int32_t* pWinCode); int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); -int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode); int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen); void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c9acead357..ec095a481f 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -955,7 +955,7 @@ void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite); void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock); -void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo); +int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo); void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin); int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index f79f29dc55..e5f76d4544 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -744,7 +744,9 @@ void streamEventReloadState(SOperatorInfo* pOperator) { SEventWindowInfo curInfo = {0}; qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i); - getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); + code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); + TSDB_CHECK_CODE(code, lino, _end); + // event window has been deleted if (!IS_VALID_SESSION_WIN(curInfo.winInfo)) { continue; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index d495e53d2e..5a90207125 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3389,11 +3389,16 @@ void resetWinRange(STimeWindow* winRange) { winRange->ekey = INT64_MAX; } -void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) { +int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t rowSize = pAggSup->resultRowSize; - int32_t code = - pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize); - if (code == TSDB_CODE_SUCCESS) { + int32_t winCode = TSDB_CODE_SUCCESS; + code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize, + &winCode); + TSDB_CHECK_CODE(code, lino, _end); + + if (winCode == TSDB_CODE_SUCCESS) { pWinInfo->sessionWin = *pKey; pWinInfo->isOutput = true; if (pWinInfo->pStatePos->needFree) { @@ -3402,6 +3407,12 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, } else { SET_SESSION_WIN_INVALID((*pWinInfo)); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) { @@ -3435,7 +3446,8 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; - getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); + code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); + TSDB_CHECK_CODE(code, lino, _end); if (!IS_VALID_SESSION_WIN(winInfo)) { continue; } @@ -3488,7 +3500,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { } for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; - getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); + code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); + TSDB_CHECK_CODE(code, lino, _end); if (!IS_VALID_SESSION_WIN(winInfo)) { continue; } @@ -3946,14 +3959,19 @@ bool compareWinStateKey(SStateKeys* left, SStateKeys* right) { return compareVal(left->pData, right); } -void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SStateWindowInfo* pCurWin, - SStateWindowInfo* pNextWin) { - int32_t size = pAggSup->resultRowSize; +int32_t getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SStateWindowInfo* pCurWin, + SStateWindowInfo* pNextWin) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamStateCur* pCur = NULL; + int32_t size = pAggSup->resultRowSize; pCurWin->winInfo.sessionWin.groupId = pKey->groupId; pCurWin->winInfo.sessionWin.win.skey = pKey->win.skey; pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey; - getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo); + code = getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo); + TSDB_CHECK_CODE(code, lino, _end); ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo)); + pCurWin->pStateKey = (SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); @@ -3969,12 +3987,11 @@ void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SS pCurWin->winInfo.sessionWin.win.ekey); pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; - SStreamStateCur* pCur = - pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); + pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); int32_t nextSize = pAggSup->resultRowSize; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, - (void**)&pNextWin->winInfo.pStatePos, &nextSize); - if (code != TSDB_CODE_SUCCESS) { + int32_t winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, + (void**)&pNextWin->winInfo.pStatePos, &nextSize); + if (winCode != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); } else { pNextWin->pStateKey = @@ -3985,9 +4002,15 @@ void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SS pNextWin->pStateKey->isNull = false; pNextWin->winInfo.isOutput = true; } + +_end: pAggSup->stateStore.streamStateFreeCur(pCur); qDebug("===stream===get state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey, pNextWin->winInfo.sessionWin.win.ekey); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData, diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index c3bd2d2422..2c30eff832 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -337,7 +337,7 @@ _end: return code; } -int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { +int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); @@ -348,9 +348,9 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v pNewPos->needFree = true; pNewPos->beFlushed = true; void* pBuff = NULL; - int32_t winCode = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); - if (winCode != TSDB_CODE_SUCCESS) { - return winCode; + (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); + if ((*pWinCode) != TSDB_CODE_SUCCESS) { + goto _end; } memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); memcpy(pNewPos->pRowBuff, pBuff, *pVLen); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 7c1d3a8baa..ce4b1800fb 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -353,22 +353,33 @@ void streamStateFreeVal(void* val) { taosMemoryFree(val); } int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SRowBuffPos* pos = (SRowBuffPos*)value; if (pos->needFree) { if (isFlushedState(pState->pFileState, key->win.ekey, 0)) { if (!pos->pRowBuff) { - return code; + goto _end; } code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); + TSDB_CHECK_CODE(code, lino, _end); + streamStateReleaseBuf(pState, pos, true); - putFreeBuff(pState->pFileState, pos); + code = putFreeBuff(pState->pFileState, pos); + TSDB_CHECK_CODE(code, lino, _end); + stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, key->win.ekey, key->groupId, code); } else { pos->beFlushed = false; code = putSessionWinResultBuff(pState->pFileState, value); + TSDB_CHECK_CODE(code, lino, _end); } } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } @@ -377,8 +388,8 @@ int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStre return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen); } -int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { - return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen); +int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { + return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen, pWinCode); } void streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 970f6e9bfc..76d3f5e795 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -124,7 +124,8 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); (*pVal) = taosMemoryCalloc(1, *pLen); void* buff = *pVal; - taosEncodeFixedI64(&buff, *pKey); + int32_t tmp = taosEncodeFixedI64(&buff, *pKey); + ASSERT(tmp == sizeof(TSKEY)); } SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -255,14 +256,25 @@ void streamFileStateDestroy(SStreamFileState* pFileState) { taosMemoryFree(pFileState); } -void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { +int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pPos->pRowBuff) { - tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); + code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); + TSDB_CHECK_CODE(code, lino, _end); pPos->pRowBuff = NULL; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); @@ -270,7 +282,8 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { while ((pNode = tdListNext(&iter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) { - putFreeBuff(pFileState, pPos); + code = putFreeBuff(pFileState, pPos); + TSDB_CHECK_CODE(code, lino, _end); if (!all) { pFileState->stateBuffRemoveByPosFn(pFileState, pPos); @@ -280,6 +293,11 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { taosMemoryFreeClear(tmp); } } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) { @@ -392,7 +410,8 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { SListNode* pNode = NULL; while ((pNode = tdListNext(&fIter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - putFreeBuff(pFileState, pPos); + code = putFreeBuff(pFileState, pPos); + TSDB_CHECK_CODE(code, lino, _end); } tdListFreeP(pFlushList, destroyRowBuffPosPtr);