set ts column index for function

This commit is contained in:
54liuyao 2024-04-10 15:35:25 +08:00
parent a3abc775ea
commit e161556f51
11 changed files with 33 additions and 26 deletions

View File

@ -328,7 +328,7 @@ typedef struct SStateStore {
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateClear)(SStreamState* pState);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);

View File

@ -29,6 +29,7 @@ struct SResultRowEntryInfo;
struct SFunctionNode;
typedef struct SScalarParam SScalarParam;
typedef struct SStreamState SStreamState;
typedef struct SFuncExecEnv {
int32_t calcMemSize;
@ -126,7 +127,7 @@ typedef struct SInputColumnInfoData {
typedef struct SSerializeDataHandle {
struct SDiskbasedBuf *pBuf;
int32_t currentPage;
void *pState;
SStreamState *pState;
} SSerializeDataHandle;
// incremental state storage
@ -164,7 +165,7 @@ typedef struct STdbState {
void *txn;
} STdbState;
typedef struct {
struct SStreamState {
STdbState *pTdbState;
struct SStreamFileState *pFileState;
int32_t number;
@ -173,7 +174,8 @@ typedef struct {
int64_t streamId;
int64_t streamBackendRid;
int8_t dump;
} SStreamState;
int32_t tsIndex;
};
typedef struct SFunctionStateStore {
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);

View File

@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number);
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);

View File

@ -897,7 +897,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi);
SStorageAPI* pApi, int32_t tsIndex);
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
STimeWindowAggSupp* pTwSup);
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);

View File

@ -2903,7 +2903,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateInfo = NULL;
pInfo->pTableScanOp = pTableScanOp;
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex);
}
pInfo->readHandle = *pHandle;

View File

@ -659,7 +659,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -711,7 +711,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -1522,7 +1522,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
@ -1705,7 +1705,7 @@ static TSKEY sesionTs(void* pKey) {
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi) {
SStorageAPI* pApi, int32_t tsIndex) {
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap;
@ -1721,7 +1721,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pSup->pState) = *pState;
pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
@ -2950,7 +2950,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -3175,7 +3175,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
}
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i);
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
taosArrayPush(pInfo->pChildren, &pChildOp);
}
}
@ -3845,7 +3845,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int16_t type = pColNode->node.resType.type;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -4082,7 +4082,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,

View File

@ -3203,10 +3203,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
SWinKey key = {0};
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = pCtx->input.pPTS;
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
}
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex);
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;

View File

@ -395,7 +395,10 @@ int32_t streamStateClear(SStreamState* pState) {
#endif
}
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) {
pState->number = number;
pState->tsIndex = tsIdex;
}
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB

View File

@ -372,17 +372,22 @@ print step4=============
sql create database test6 vgroups 4;
sql use test6;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s);
sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s);
sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a);
sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9;
sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t2 values(1648791215000,3,4,5,1.1);
sql insert into t2 values(1648791217000,4,5,6,1.1);
sql insert into t1 values(1648791211000,1,2,3,0);
sql insert into t1 values(1648791213000,2,3,4,0);
sql insert into t2 values(1648791215000,3,4,5,0);
sql insert into t2 values(1648791217000,4,5,6,0);
$loop_count = 0