opt expired data

This commit is contained in:
54liuyao 2023-11-29 11:16:56 +08:00
parent cfedb98dfd
commit ae386c6e34
10 changed files with 245 additions and 55 deletions

View File

@ -351,6 +351,8 @@ typedef struct SStateStore {
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
bool (*isIncrementalTimeStamp)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);

View File

@ -55,6 +55,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *p
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count);
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
#ifdef __cplusplus
}

View File

@ -80,6 +80,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;
pStore->isIncrementalTimeStamp = isIncrementalTimeStamp;
pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;

View File

@ -189,6 +189,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;
pStore->isIncrementalTimeStamp = isIncrementalTimeStamp;
pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;

View File

@ -350,6 +350,7 @@ typedef struct SStreamAggSupporter {
SStateStore stateStore;
STimeWindow winRange;
SStorageAPI* pSessionAPI;
struct SUpdateInfo* pUpdateInfo;
} SStreamAggSupporter;
typedef struct SWindowSupporter {
@ -534,6 +535,7 @@ typedef struct SStreamIntervalOperatorInfo {
SOpCheckPointInfo checkPointInfo;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
struct SUpdateInfo* pUpdateInfo;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
@ -830,6 +832,7 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
void resetWinRange(STimeWindow* winRange);
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -1619,6 +1619,15 @@ void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKE
pBlock->info.rows++;
}
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts) {
bool isExpired = false;
bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts);
if (!isInc) {
isExpired = isOverdue(ts, pTwSup);
}
return isExpired;
}
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
if (out) {
blockDataCleanup(pInfo->pUpdateDataRes);

View File

@ -297,7 +297,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
int32_t rows = pSDataBlock->info.rows;
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
for (int32_t i = 0; i < rows; i += winRows) {
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i])) {
i++;
continue;
}

View File

@ -450,6 +450,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
pScanInfo->interval = pInfo->interval;
pScanInfo->twAggSup = pInfo->twAggSup;
pScanInfo->pState = pInfo->pState;
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
}
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
@ -800,7 +801,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
}
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) ||
if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData &&
checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid,
nextWin.ekey)) ||
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
@ -1621,6 +1624,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
pScanInfo->igCheckUpdate);
}
pScanInfo->twAggSup = *pTwSup;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
}
static TSKEY sesionTs(void* pKey) {
@ -2015,7 +2019,9 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
for (int32_t i = 0; i < rows;) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData &&
checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
pSDataBlock->info.id.uid, endTsCols[i])) {
i++;
continue;
}
@ -3327,7 +3333,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < rows; i += winRows) {
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup) || colDataIsNull_s(pKeyColInfo, i)) {
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || colDataIsNull_s(pKeyColInfo, i)) {
i++;
continue;
}

View File

@ -129,9 +129,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
}
pInfo->maxDataVersion = 0;
return pInfo;
}
@ -384,3 +384,14 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
tDecoderClear(&decoder);
return 0;
}
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
bool res = true;
if ( pMapMaxTs && ts < *pMapMaxTs ) {
res = false;
} else {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
}
return res;
}

View File

@ -107,7 +107,7 @@ sql select * from information_schema.ins_databases
print ======database=$rows
sql use test1
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
@ -118,9 +118,9 @@ sql insert into ts1 values(1648791211000,1,2,3);
sleep 1000
sql insert into ts1 values(1648791222001,2,2,3);
sleep 1000
sql insert into ts2 values(1648791211000,1,2,3);
sleep 1000
sql insert into ts2 values(1648791222001,2,2,3);
sleep 1000
sql insert into ts2 values(1648791211000,1,2,3);
$loop_count = 0
loop4:
@ -132,16 +132,26 @@ if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
if $data01 != 1 then
print =====data01=$data01
goto loop4
endi
if $data02 != 2 then
if $data02 != 1 then
print =====data02=$data02
goto loop4
endi
if $data11 != 2 then
print =====data11=$data11
goto loop4
endi
if $data12 != 2 then
print =====data12=$data12
goto loop4
endi
$loop_count = 0
loop5:
sleep 1000
@ -162,4 +172,148 @@ if $data02 != 1 then
goto loop5
endi
if $data11 != 2 then
print =====data11=$data11
goto loop4
endi
if $data12 != 2 then
print =====data12=$data12
goto loop4
endi
print =============== create database test2
sql create database test2 vgroups 4
sql select * from information_schema.ins_databases
print ======database=$rows
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,3,3);
sql create table ts4 using st tags(4,4,4);
sql create stream streams_21 trigger at_once IGNORE EXPIRED 1 into streamt_21 as select _wstart, count(*) c1 from st interval(10s) ;
sleep 1000
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791211001,2,2,3);
sql insert into ts1 values(1648791211002,2,2,3);
sql insert into ts1 values(1648791211003,2,2,3);
sql insert into ts1 values(1648791211004,2,2,3);
sleep 1000
sql insert into ts2 values(1648791201000,1,2,3);
sql insert into ts2 values(1648791201001,2,2,3);
sql insert into ts2 values(1648791201002,2,2,3);
sql insert into ts2 values(1648791201003,2,2,3);
sql insert into ts2 values(1648791201004,2,2,3);
sleep 1000
sql insert into ts2 values(1648791101000,1,2,3);
sql insert into ts2 values(1648791101001,2,2,3);
sql insert into ts2 values(1648791101002,2,2,3);
sql insert into ts2 values(1648791101003,2,2,3);
sql insert into ts2 values(1648791101004,2,2,3);
$loop_count = 0
loop6:
sleep 1000
print 1 select * from streamt_21;
sql select * from streamt_21;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop6
endi
if $data01 != 5 then
print =====data01=$data01
goto loop6
endi
if $data11 != 5 then
print =====data11=$data11
goto loop6
endi
sleep 1000
sql insert into ts3 values(1648791241000,1,2,3);
sleep 1000
sql insert into ts3 values(1648791231001,2,2,3);
sql insert into ts3 values(1648791231002,2,2,3);
sql insert into ts3 values(1648791231003,2,2,3);
sql insert into ts3 values(1648791231004,2,2,3);
$loop_count = 0
loop7:
sleep 1000
print 2 select * from streamt_21;
sql select * from streamt_21;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print =====rows=$rows
goto loop7
endi
if $data21 != 1 then
print =====data21=$data21
goto loop7
endi
sleep 1000
sql insert into ts4 values(1648791231001,2,2,3);
sql insert into ts4 values(1648791231002,2,2,3);
sql insert into ts4 values(1648791231003,2,2,3);
sql insert into ts4 values(1648791231004,2,2,3);
sleep 1000
sql insert into ts4 values(1648791211001,2,2,3);
sql insert into ts4 values(1648791211002,2,2,3);
sql insert into ts4 values(1648791211003,2,2,3);
sql insert into ts4 values(1648791211004,2,2,3);
$loop_count = 0
loop8:
sleep 1000
print 3 select * from streamt_21;
sql select * from streamt_21;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop8
endi
if $data21 != 4 then
print =====data21=$data21
goto loop8
endi
if $data31 != 1 then
print =====data31=$data31
goto loop8
endi
print ============================end
system sh/stop_dnodes.sh