From c6d492d3fd3c2507d2a5d13e87bf6f4ddda48309 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 17:41:59 +0800 Subject: [PATCH] set ts column index for function --- include/libs/stream/tstreamFileState.h | 9 +++++-- .../executor/src/streamcountwindowoperator.c | 3 +-- .../executor/src/streameventwindowoperator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 24 +++---------------- source/libs/stream/src/streamSessionState.c | 7 ++++++ source/libs/stream/src/streamState.c | 12 +++++----- source/libs/stream/src/tstreamFileState.c | 10 +++++++- 7 files changed, 34 insertions(+), 33 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index a9a198d194..68b9c4baa2 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState; typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); -typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); @@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); typedef int32_t (*_state_file_clear_fn)(SStreamState* pState); +typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2); SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); void* getRowStateBuff(SStreamFileState* pFileState); @@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +//function +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index db06775a18..224808b41a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -657,6 +657,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -675,8 +676,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index c948b57534..eef002ff35 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -709,6 +709,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -716,7 +717,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 86d428cddf..98b04af525 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1730,25 +1730,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); - int32_t pageSize = 4096; - while (pageSize < pSup->resultRowSize * 4) { - pageSize <<= 1u; - } - // at least four pages need to be in buffer - int32_t bufSize = 4096 * 256; - if (bufSize <= pageSize) { - bufSize = pageSize * 4; - } - - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; - qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); - return terrno; - } - - int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { - pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf; + pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } pSup->pSessionAPI = pApi; @@ -2948,6 +2931,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh .deleteMark = getDeleteMark(&pSessionNode->window, 0), }; + pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); @@ -2957,7 +2941,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } @@ -3843,14 +3826,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys } int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } - - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 295132a4f5..687b4bcf12 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -202,6 +202,13 @@ _end: return code; } +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + SWinKey* pTmpkey = pKey; + ASSERT(keyLen == sizeof(SWinKey)); + SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; + return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen); +} + int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pKey = pPos->pKey; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 18bc672c8d..1f46384448 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; #else @@ -290,10 +290,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; return code; #else diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f86ab6b8a3..19f403a6a6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -58,6 +58,8 @@ struct SStreamFileState { _state_file_remove_fn stateFileRemoveFn; _state_file_get_fn stateFileGetFn; _state_file_clear_fn stateFileClearFn; + + _state_fun_get_fn stateFunctionGetFn; }; typedef SRowBuffPos SRowBuffInfo; @@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = intervalFileGetFn; pFileState->stateFileClearFn = streamStateClear_rocksdb; pFileState->cfName = taosStrdup("state"); + pFileState->stateFunctionGetFn = getRowBuff; } else { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; @@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = sessionFileGetFn; pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; pFileState->cfName = taosStrdup("sess"); + pFileState->stateFunctionGetFn = getSessionRowBuff; } if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { @@ -736,7 +740,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { return TSDB_CODE_SUCCESS; } -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->flushMark = TMAX(pFileState->flushMark, ts); @@ -754,3 +758,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; } + +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen); +}