From 34bf638f192cf33d6eaab2069fac25b51e906696 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Apr 2023 16:21:57 +0800 Subject: [PATCH] fix bug --- include/libs/stream/tstreamUpdate.h | 6 +--- source/libs/executor/src/scanoperator.c | 21 +++++--------- source/libs/stream/src/streamState.c | 1 + source/libs/stream/src/streamUpdate.c | 38 ++----------------------- 4 files changed, 12 insertions(+), 54 deletions(-) diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index ab328c6ad5..4678aa0bd9 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -40,9 +40,7 @@ typedef struct SUpdateInfo { TSKEY minTS; SScalableBf *pCloseWinSBF; SHashObj *pMap; - STimeWindow scanWindow; - uint64_t scanGroupId; - uint64_t maxVersion; + uint64_t maxDataVersion; } SUpdateInfo; SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); @@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8a3ac9d6a3..4b4cae1a55 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1022,8 +1022,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou pInfo->groupId = groupCol[rowIndex]; } -void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { +void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) { pTableScanInfo->base.cond.twindows = *pWin; + pTableScanInfo->base.cond.endVersion = version; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; tsdbReaderClose(pTableScanInfo->base.dataReader); @@ -1142,7 +1143,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ break; } - resetTableScanInfo(pInfo->pTableScanOp->info, &win); + resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); pInfo->pTableScanOp->status = OP_OPENED; return true; } @@ -1784,6 +1785,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { if (pInfo->pUpdateInfo) { + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); checkUpdateData(pInfo, true, pBlock, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -1845,7 +1847,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } - /*resetTableScanInfo(pTSInfo, pWin);*/ tsdbReaderClose(pTSInfo->base.dataReader); qDebug("4"); @@ -1891,8 +1892,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); printDataBlock(pSDB, "scan recover update"); @@ -1961,6 +1960,9 @@ FETCH_NEXT_BLOCK: pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; pBlock->info.dataLoad = 1; + if (pInfo->pUpdateInfo) { + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); + } blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -2058,8 +2060,6 @@ FETCH_NEXT_BLOCK: SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); printDataBlock(pSDB, "stream scan update"); @@ -2125,13 +2125,6 @@ FETCH_NEXT_BLOCK: setBlockIntoRes(pInfo, &block, false); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId, - pInfo->pRes->info.version)) { - printDataBlock(pInfo->pRes, "stream scan ignore"); - blockDataCleanup(pInfo->pRes); - continue; - } - if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return pInfo->pCreateTbRes; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 85b176c684..4ead4c49b5 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1066,6 +1066,7 @@ void streamStateDestroy(SStreamState* pState) { #ifdef USE_ROCKSDB streamFileStateDestroy(pState->pFileState); streamStateDestroy_rocksdb(pState); + taosMemoryFreeClear(pState->parNameMap); // do nothong #endif taosMemoryFreeClear(pState->pTdbState); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index be12c72d00..70a1c543f6 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pCloseWinSBF = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); - pInfo->maxVersion = 0; - pInfo->scanGroupId = 0; - pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pInfo->maxDataVersion = 0; return pInfo; } @@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { return true; } -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { - qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - pInfo->scanWindow = *pWin; - pInfo->scanGroupId = groupId; - pInfo->maxVersion = version; -} - -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { - if (!pInfo) { - return false; - } - qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey && - version <= pInfo->maxVersion) { - qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - return true; - } - return false; -} - void updateInfoDestroy(SUpdateInfo *pInfo) { if (pInfo == NULL) { return; @@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; } - if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; - if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1; tEndEncode(&encoder); @@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); - - if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1; - if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1; + if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; tEndDecode(&decoder);