From ae386c6e345a9d182ed9483d1047203e0968470b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 29 Nov 2023 11:16:56 +0800 Subject: [PATCH] opt expired data --- include/libs/executor/storageapi.h | 2 + include/libs/stream/tstreamUpdate.h | 1 + source/dnode/snode/src/snodeInitApi.c | 1 + source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/libs/executor/inc/executorInt.h | 91 +++++----- source/libs/executor/src/scanoperator.c | 9 + .../executor/src/streameventwindowoperator.c | 3 +- .../executor/src/streamtimewindowoperator.c | 13 +- source/libs/stream/src/streamUpdate.c | 15 +- .../script/tsim/stream/ignoreExpiredData.sim | 164 +++++++++++++++++- 10 files changed, 245 insertions(+), 55 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 045f2bad70..1d40274504 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -351,6 +351,8 @@ typedef struct SStateStore { TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); + bool (*isIncrementalTimeStamp)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); + void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 41ada56904..af93c6ac01 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -55,6 +55,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *p int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count); void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count); +bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 570feffc14..d1e0aefca0 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -80,6 +80,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->updateInfoDestroy = updateInfoDestroy; pStore->windowSBfDelete = windowSBfDelete; pStore->windowSBfAdd = windowSBfAdd; + pStore->isIncrementalTimeStamp = isIncrementalTimeStamp; pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a6673917bf..6584b7072f 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -189,6 +189,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->updateInfoDestroy = updateInfoDestroy; pStore->windowSBfDelete = windowSBfDelete; pStore->windowSBfAdd = windowSBfAdd; + pStore->isIncrementalTimeStamp = isIncrementalTimeStamp; pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e29583d8fc..865fe98be9 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -338,18 +338,19 @@ enum { }; typedef struct SStreamAggSupporter { - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row - SSDataBlock* pScanBlock; - SStreamState* pState; - int64_t gap; // stream session window gap - SqlFunctionCtx* pDummyCtx; // for combine - SSHashObj* pResultRows; - int32_t stateKeySize; - int16_t stateKeyType; - SDiskbasedBuf* pResultBuf; - SStateStore stateStore; - STimeWindow winRange; - SStorageAPI* pSessionAPI; + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SSDataBlock* pScanBlock; + SStreamState* pState; + int64_t gap; // stream session window gap + SqlFunctionCtx* pDummyCtx; // for combine + SSHashObj* pResultRows; + int32_t stateKeySize; + int16_t stateKeyType; + SDiskbasedBuf* pResultBuf; + SStateStore stateStore; + STimeWindow winRange; + SStorageAPI* pSessionAPI; + struct SUpdateInfo* pUpdateInfo; } SStreamAggSupporter; typedef struct SWindowSupporter { @@ -502,38 +503,39 @@ typedef struct SOpCheckPointInfo { } SOpCheckPointInfo; typedef struct SStreamIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SAggSupporter aggSup; // aggregate supporter - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindowAggSupp twAggSup; - bool invertible; - bool ignoreExpiredData; - bool ignoreExpiredDataSaved; - SArray* pDelWins; // SWinRes - int32_t delIndex; - SSDataBlock* pDelRes; - SPhysiNode* pPhyNode; // create new child - SHashObj* pPullDataMap; - SArray* pPullWins; // SPullWindowInfo - int32_t pullIndex; - SSDataBlock* pPullDataRes; - SArray* pChildren; - int32_t numOfChild; - SStreamState* pState; // void - SWinKey delKey; - uint64_t numOfDatapack; - SArray* pUpdated; - SSHashObj* pUpdatedMap; - int64_t dataVersion; - SStateStore stateStore; - bool recvGetAll; - SHashObj* pFinalPullDataMap; - SOpCheckPointInfo checkPointInfo; - bool reCkBlock; - SSDataBlock* pCheckpointRes; + SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindowAggSupp twAggSup; + bool invertible; + bool ignoreExpiredData; + bool ignoreExpiredDataSaved; + SArray* pDelWins; // SWinRes + int32_t delIndex; + SSDataBlock* pDelRes; + SPhysiNode* pPhyNode; // create new child + SHashObj* pPullDataMap; + SArray* pPullWins; // SPullWindowInfo + int32_t pullIndex; + SSDataBlock* pPullDataRes; + SArray* pChildren; + int32_t numOfChild; + SStreamState* pState; // void + SWinKey delKey; + uint64_t numOfDatapack; + SArray* pUpdated; + SSHashObj* pUpdatedMap; + int64_t dataVersion; + SStateStore stateStore; + bool recvGetAll; + SHashObj* pFinalPullDataMap; + SOpCheckPointInfo checkPointInfo; + bool reCkBlock; + SSDataBlock* pCheckpointRes; + struct SUpdateInfo* pUpdateInfo; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { @@ -830,6 +832,7 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap); int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); void resetWinRange(STimeWindow* winRange); +bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 448c585869..4c212afce8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1619,6 +1619,15 @@ void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKE pBlock->info.rows++; } +bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts) { + bool isExpired = false; + bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts); + if (!isInc) { + isExpired = isOverdue(ts, pTwSup); + } + return isExpired; +} + static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) { if (out) { blockDataCleanup(pInfo->pUpdateDataRes); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 8029b9b156..a2394b4690 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -297,7 +297,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl int32_t rows = pSDataBlock->info.rows; blockDataEnsureCapacity(pAggSup->pScanBlock, rows); for (int32_t i = 0; i < rows; i += winRows) { - if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) { + if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, + &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i])) { i++; continue; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 86d83e05ad..2df3213c39 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -450,6 +450,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->interval = pInfo->interval; pScanInfo->twAggSup = pInfo->twAggSup; pScanInfo->pState = pInfo->pState; + pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; } void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, @@ -800,7 +801,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) || + if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && + checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid, + nextWin.ekey)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { @@ -1621,6 +1624,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->igCheckUpdate); } pScanInfo->twAggSup = *pTwSup; + pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; } static TSKEY sesionTs(void* pKey) { @@ -2015,7 +2019,9 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; for (int32_t i = 0; i < rows;) { - if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) { + if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData && + checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup, + pSDataBlock->info.id.uid, endTsCols[i])) { i++; continue; } @@ -3327,7 +3333,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl blockDataEnsureCapacity(pAggSup->pScanBlock, rows); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < rows; i += winRows) { - if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup) || colDataIsNull_s(pKeyColInfo, i)) { + if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, + &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || colDataIsNull_s(pKeyColInfo, i)) { i++; continue; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 858667b563..f78f6f4df1 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -129,9 +129,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma } pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->pCloseWinSBF = NULL; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); - pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); } + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); + pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); pInfo->maxDataVersion = 0; return pInfo; } @@ -384,3 +384,14 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { tDecoderClear(&decoder); return 0; } + +bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { + TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); + bool res = true; + if ( pMapMaxTs && ts < *pMapMaxTs ) { + res = false; + } else { + taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); + } + return res; +} diff --git a/tests/script/tsim/stream/ignoreExpiredData.sim b/tests/script/tsim/stream/ignoreExpiredData.sim index 884b7cbb5f..864f8caea5 100644 --- a/tests/script/tsim/stream/ignoreExpiredData.sim +++ b/tests/script/tsim/stream/ignoreExpiredData.sim @@ -107,7 +107,7 @@ sql select * from information_schema.ins_databases print ======database=$rows -sql use test1 +sql use test1; sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table ts1 using st tags(1,1,1); @@ -118,9 +118,9 @@ sql insert into ts1 values(1648791211000,1,2,3); sleep 1000 sql insert into ts1 values(1648791222001,2,2,3); sleep 1000 -sql insert into ts2 values(1648791211000,1,2,3); -sleep 1000 sql insert into ts2 values(1648791222001,2,2,3); +sleep 1000 +sql insert into ts2 values(1648791211000,1,2,3); $loop_count = 0 loop4: @@ -132,16 +132,26 @@ if $loop_count == 10 then return -1 endi -if $data01 != 2 then +if $data01 != 1 then print =====data01=$data01 goto loop4 endi -if $data02 != 2 then +if $data02 != 1 then print =====data02=$data02 goto loop4 endi +if $data11 != 2 then + print =====data11=$data11 + goto loop4 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop4 +endi + $loop_count = 0 loop5: sleep 1000 @@ -162,4 +172,148 @@ if $data02 != 1 then goto loop5 endi +if $data11 != 2 then + print =====data11=$data11 + goto loop4 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop4 +endi + + +print =============== create database test2 +sql create database test2 vgroups 4 +sql select * from information_schema.ins_databases + +print ======database=$rows + +sql use test2; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,3,3); +sql create table ts4 using st tags(4,4,4); +sql create stream streams_21 trigger at_once IGNORE EXPIRED 1 into streamt_21 as select _wstart, count(*) c1 from st interval(10s) ; +sleep 1000 + +sql insert into ts1 values(1648791211000,1,2,3); +sql insert into ts1 values(1648791211001,2,2,3); +sql insert into ts1 values(1648791211002,2,2,3); +sql insert into ts1 values(1648791211003,2,2,3); +sql insert into ts1 values(1648791211004,2,2,3); + +sleep 1000 +sql insert into ts2 values(1648791201000,1,2,3); +sql insert into ts2 values(1648791201001,2,2,3); +sql insert into ts2 values(1648791201002,2,2,3); +sql insert into ts2 values(1648791201003,2,2,3); +sql insert into ts2 values(1648791201004,2,2,3); + +sleep 1000 +sql insert into ts2 values(1648791101000,1,2,3); +sql insert into ts2 values(1648791101001,2,2,3); +sql insert into ts2 values(1648791101002,2,2,3); +sql insert into ts2 values(1648791101003,2,2,3); +sql insert into ts2 values(1648791101004,2,2,3); + + +$loop_count = 0 +loop6: +sleep 1000 +print 1 select * from streamt_21; +sql select * from streamt_21; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop6 +endi + +if $data01 != 5 then + print =====data01=$data01 + goto loop6 +endi + +if $data11 != 5 then + print =====data11=$data11 + goto loop6 +endi + +sleep 1000 +sql insert into ts3 values(1648791241000,1,2,3); + +sleep 1000 +sql insert into ts3 values(1648791231001,2,2,3); +sql insert into ts3 values(1648791231002,2,2,3); +sql insert into ts3 values(1648791231003,2,2,3); +sql insert into ts3 values(1648791231004,2,2,3); + +$loop_count = 0 +loop7: +sleep 1000 +print 2 select * from streamt_21; +sql select * from streamt_21; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print =====rows=$rows + goto loop7 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop7 +endi + +sleep 1000 +sql insert into ts4 values(1648791231001,2,2,3); +sql insert into ts4 values(1648791231002,2,2,3); +sql insert into ts4 values(1648791231003,2,2,3); +sql insert into ts4 values(1648791231004,2,2,3); + +sleep 1000 +sql insert into ts4 values(1648791211001,2,2,3); +sql insert into ts4 values(1648791211002,2,2,3); +sql insert into ts4 values(1648791211003,2,2,3); +sql insert into ts4 values(1648791211004,2,2,3); + +$loop_count = 0 +loop8: +sleep 1000 +print 3 select * from streamt_21; +sql select * from streamt_21; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 4 then + print =====rows=$rows + goto loop8 +endi + +if $data21 != 4 then + print =====data21=$data21 + goto loop8 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop8 +endi + +print ============================end + system sh/stop_dnodes.sh