diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b47a162a1a..10697e64b2 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -328,7 +328,7 @@ typedef struct SStateStore { int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateClear)(SStreamState* pState); - void (*streamStateSetNumber)(SStreamState* pState, int32_t number); + void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0fa84c99c6..a1074d6901 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -29,6 +29,7 @@ struct SResultRowEntryInfo; struct SFunctionNode; typedef struct SScalarParam SScalarParam; +typedef struct SStreamState SStreamState; typedef struct SFuncExecEnv { int32_t calcMemSize; @@ -126,7 +127,7 @@ typedef struct SInputColumnInfoData { typedef struct SSerializeDataHandle { struct SDiskbasedBuf *pBuf; int32_t currentPage; - void *pState; + SStreamState *pState; } SSerializeDataHandle; // incremental state storage @@ -164,7 +165,7 @@ typedef struct STdbState { void *txn; } STdbState; -typedef struct { +struct SStreamState { STdbState *pTdbState; struct SStreamFileState *pFileState; int32_t number; @@ -173,7 +174,8 @@ typedef struct { int64_t streamId; int64_t streamBackendRid; int8_t dump; -} SStreamState; + int32_t tsIndex; +}; typedef struct SFunctionStateStore { int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index c603f9f5ac..f1c9d712e8 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); -void streamStateSetNumber(SStreamState* pState, int32_t number); +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index a9a198d194..68b9c4baa2 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState; typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); -typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); @@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); typedef int32_t (*_state_file_clear_fn)(SStreamState* pState); +typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2); SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, @@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); void* getRowStateBuff(SStreamFileState* pFileState); @@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); +//function +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5263546765..628311591a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -897,7 +897,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi); + SStorageAPI* pApi, int32_t tsIndex); void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup); void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64806a1c72..7b25237aba 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2904,7 +2904,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) { - pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex); } pInfo->readHandle = *pHandle; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 720734431f..224808b41a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -657,9 +657,10 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -675,8 +676,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ef5c2572d9..eef002ff35 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -709,14 +709,14 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys goto _error; } + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 51071e2b4a..98b04af525 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1522,7 +1522,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -1705,7 +1705,7 @@ static TSKEY sesionTs(void* pKey) { int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi) { + SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -1721,7 +1721,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pSup->pState) = *pState; - pSup->stateStore.streamStateSetNumber(pSup->pState, -1); + pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, @@ -1730,25 +1730,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); - int32_t pageSize = 4096; - while (pageSize < pSup->resultRowSize * 4) { - pageSize <<= 1u; - } - // at least four pages need to be in buffer - int32_t bufSize = 4096 * 256; - if (bufSize <= pageSize) { - bufSize = pageSize * 4; - } - - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; - qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); - return terrno; - } - - int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { - pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf; + pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } pSup->pSessionAPI = pApi; @@ -2948,16 +2931,16 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh .deleteMark = getDeleteMark(&pSessionNode->window, 0), }; + pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } @@ -3175,7 +3158,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream } SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i); + pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex); taosArrayPush(pInfo->pChildren, &pChildOp); } } @@ -3843,14 +3826,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys } int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; + pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } - - pInfo->primaryTsIndex = tsSlotId; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; @@ -4082,7 +4064,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b709abc30b..5a792f6139 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3203,10 +3203,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SWinKey key = {0}; if (pCtx->saveHandle.pBuf == NULL) { - SColumnInfoData* pColInfo = pCtx->input.pPTS; - if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) { - pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); - } + SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 295132a4f5..687b4bcf12 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -202,6 +202,13 @@ _end: return code; } +int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + SWinKey* pTmpkey = pKey; + ASSERT(keyLen == sizeof(SWinKey)); + SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; + return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen); +} + int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pKey = pPos->pKey; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b53dc9daa6..ad6f5d48dc 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; #else @@ -290,11 +290,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + int32_t len = getRowStateRowSize(pState->pFileState); + int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; + streamStateReleaseBuf(pState, pVal, false); return code; #else return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen); @@ -332,7 +333,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) { int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) { int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal); - streamFileStateReleaseBuff(pState->pFileState, pos, false); + streamStateReleaseBuf(pState, pos, false); return code; } @@ -395,7 +396,10 @@ int32_t streamStateClear(SStreamState* pState) { #endif } -void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) { + pState->number = number; + pState->tsIndex = tsIdex; +} int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) { #ifdef USE_ROCKSDB diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f86ab6b8a3..19f403a6a6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -58,6 +58,8 @@ struct SStreamFileState { _state_file_remove_fn stateFileRemoveFn; _state_file_get_fn stateFileGetFn; _state_file_clear_fn stateFileClearFn; + + _state_fun_get_fn stateFunctionGetFn; }; typedef SRowBuffPos SRowBuffInfo; @@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = intervalFileGetFn; pFileState->stateFileClearFn = streamStateClear_rocksdb; pFileState->cfName = taosStrdup("state"); + pFileState->stateFunctionGetFn = getRowBuff; } else { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; @@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileGetFn = sessionFileGetFn; pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; pFileState->cfName = taosStrdup("sess"); + pFileState->stateFunctionGetFn = getSessionRowBuff; } if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { @@ -736,7 +740,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { return TSDB_CODE_SUCCESS; } -int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } +int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->flushMark = TMAX(pFileState->flushMark, ts); @@ -754,3 +758,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; } + +int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen); +} diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index f507ab7d3b..7b5f587feb 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -372,17 +372,22 @@ print step4============= sql create database test6 vgroups 4; sql use test6; -sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s); +sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s); +sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s); +sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a); +sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9; +sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2); sleep 1000 -sql insert into t1 values(1648791211000,1,2,3,1.0); -sql insert into t1 values(1648791213000,2,3,4,1.1); -sql insert into t2 values(1648791215000,3,4,5,1.1); -sql insert into t2 values(1648791217000,4,5,6,1.1); +sql insert into t1 values(1648791211000,1,2,3,0); +sql insert into t1 values(1648791213000,2,3,4,0); +sql insert into t2 values(1648791215000,3,4,5,0); +sql insert into t2 values(1648791217000,4,5,6,0); $loop_count = 0