From e161556f5184f5a9f50297811e798701132242b3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Apr 2024 15:35:25 +0800 Subject: [PATCH] set ts column index for function --- include/libs/executor/storageapi.h | 2 +- include/libs/function/function.h | 8 +++++--- include/libs/stream/streamState.h | 2 +- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/scanoperator.c | 2 +- .../libs/executor/src/streamcountwindowoperator.c | 2 +- .../libs/executor/src/streameventwindowoperator.c | 2 +- .../libs/executor/src/streamtimewindowoperator.c | 14 +++++++------- source/libs/function/src/builtinsimpl.c | 5 +---- source/libs/stream/src/streamState.c | 5 ++++- tests/script/tsim/stream/basic5.sim | 15 ++++++++++----- 11 files changed, 33 insertions(+), 26 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index b47a162a1a..10697e64b2 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -328,7 +328,7 @@ typedef struct SStateStore { int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateClear)(SStreamState* pState); - void (*streamStateSetNumber)(SStreamState* pState, int32_t number); + void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0fa84c99c6..a1074d6901 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -29,6 +29,7 @@ struct SResultRowEntryInfo; struct SFunctionNode; typedef struct SScalarParam SScalarParam; +typedef struct SStreamState SStreamState; typedef struct SFuncExecEnv { int32_t calcMemSize; @@ -126,7 +127,7 @@ typedef struct SInputColumnInfoData { typedef struct SSerializeDataHandle { struct SDiskbasedBuf *pBuf; int32_t currentPage; - void *pState; + SStreamState *pState; } SSerializeDataHandle; // incremental state storage @@ -164,7 +165,7 @@ typedef struct STdbState { void *txn; } STdbState; -typedef struct { +struct SStreamState { STdbState *pTdbState; struct SStreamFileState *pFileState; int32_t number; @@ -173,7 +174,8 @@ typedef struct { int64_t streamId; int64_t streamBackendRid; int8_t dump; -} SStreamState; + int32_t tsIndex; +}; typedef struct SFunctionStateStore { int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index c603f9f5ac..f1c9d712e8 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); -void streamStateSetNumber(SStreamState* pState, int32_t number); +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5263546765..628311591a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -897,7 +897,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi); + SStorageAPI* pApi, int32_t tsIndex); void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup); void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a6613e589a..dc5ce60f95 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2903,7 +2903,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateInfo = NULL; pInfo->pTableScanOp = pTableScanOp; if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) { - pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex); } pInfo->readHandle = *pHandle; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 720734431f..db06775a18 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -659,7 +659,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ef5c2572d9..c948b57534 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -711,7 +711,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 51071e2b4a..86d428cddf 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1522,7 +1522,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -1705,7 +1705,7 @@ static TSKEY sesionTs(void* pKey) { int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi) { + SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -1721,7 +1721,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pSup->pState) = *pState; - pSup->stateStore.streamStateSetNumber(pSup->pState, -1); + pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, @@ -2950,7 +2950,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3175,7 +3175,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream } SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i); + pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex); taosArrayPush(pInfo->pChildren, &pChildOp); } } @@ -3845,7 +3845,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int16_t type = pColNode->node.resType.type; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4082,7 +4082,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); + pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b709abc30b..5a792f6139 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3203,10 +3203,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SWinKey key = {0}; if (pCtx->saveHandle.pBuf == NULL) { - SColumnInfoData* pColInfo = pCtx->input.pPTS; - if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) { - pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); - } + SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b53dc9daa6..18bc672c8d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -395,7 +395,10 @@ int32_t streamStateClear(SStreamState* pState) { #endif } -void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } +void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) { + pState->number = number; + pState->tsIndex = tsIdex; +} int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) { #ifdef USE_ROCKSDB diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index f507ab7d3b..7b5f587feb 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -372,17 +372,22 @@ print step4============= sql create database test6 vgroups 4; sql use test6; -sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s); +sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s); +sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s); +sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a); +sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9; +sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2); sleep 1000 -sql insert into t1 values(1648791211000,1,2,3,1.0); -sql insert into t1 values(1648791213000,2,3,4,1.1); -sql insert into t2 values(1648791215000,3,4,5,1.1); -sql insert into t2 values(1648791217000,4,5,6,1.1); +sql insert into t1 values(1648791211000,1,2,3,0); +sql insert into t1 values(1648791213000,2,3,4,0); +sql insert into t2 values(1648791215000,3,4,5,0); +sql insert into t2 values(1648791217000,4,5,6,0); $loop_count = 0