From 32b59af4b18a19ae749042f55d4be91c9769c6d7 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 31 Jul 2023 17:04:41 +0800 Subject: [PATCH] check update data --- include/libs/executor/storageapi.h | 2 ++ include/libs/stream/tstreamUpdate.h | 2 ++ source/dnode/snode/src/snodeInitApi.c | 2 ++ source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 ++ source/libs/executor/src/scanoperator.c | 4 +++- source/libs/stream/src/streamUpdate.c | 4 ++-- 6 files changed, 13 insertions(+), 3 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 2fbd7851e8..773f373a2d 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -368,6 +368,8 @@ typedef struct SStateStore { bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); void (*updateInfoDestroy)(SUpdateInfo* pInfo); + void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); + void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index bd5a3be8de..7bb1d027c9 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); +void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count); +void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index c046505630..e737e3fa37 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoDestroy = updateInfoDestroy; + pStore->windowSBfDelete = windowSBfDelete; + pStore->windowSBfAdd = windowSBfAdd; pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 3dfaa28c09..5c8d563d73 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoDestroy = updateInfoDestroy; + pStore->windowSBfDelete = windowSBfDelete; + pStore->windowSBfAdd = windowSBfAdd; pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7434db61db..6331f4bd35 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2411,7 +2411,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo; } else { - pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS); + pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1); + pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1); + ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); SHashObj* curMap = pInfo->pUpdateInfo->pMap; void *pIte = taosHashIterate(curMap, NULL); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 85be120dbd..7a8de91d77 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -33,7 +33,7 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } -static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { +void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { if (pInfo->numSBFs < count) { count = pInfo->numSBFs; } @@ -49,7 +49,7 @@ static void clearItemHelper(void *p) { tScalableBfDestroy(*pBf); } -static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { +void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { if (count < pInfo->numSBFs) { for (uint64_t i = 0; i < count; ++i) { SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);