From 34bf638f192cf33d6eaab2069fac25b51e906696 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Apr 2023 16:21:57 +0800 Subject: [PATCH 1/3] fix bug --- include/libs/stream/tstreamUpdate.h | 6 +--- source/libs/executor/src/scanoperator.c | 21 +++++--------- source/libs/stream/src/streamState.c | 1 + source/libs/stream/src/streamUpdate.c | 38 ++----------------------- 4 files changed, 12 insertions(+), 54 deletions(-) diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index ab328c6ad5..4678aa0bd9 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -40,9 +40,7 @@ typedef struct SUpdateInfo { TSKEY minTS; SScalableBf *pCloseWinSBF; SHashObj *pMap; - STimeWindow scanWindow; - uint64_t scanGroupId; - uint64_t maxVersion; + uint64_t maxDataVersion; } SUpdateInfo; SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); @@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8a3ac9d6a3..4b4cae1a55 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1022,8 +1022,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou pInfo->groupId = groupCol[rowIndex]; } -void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { +void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) { pTableScanInfo->base.cond.twindows = *pWin; + pTableScanInfo->base.cond.endVersion = version; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; tsdbReaderClose(pTableScanInfo->base.dataReader); @@ -1142,7 +1143,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ break; } - resetTableScanInfo(pInfo->pTableScanOp->info, &win); + resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); pInfo->pTableScanOp->status = OP_OPENED; return true; } @@ -1784,6 +1785,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { if (pInfo->pUpdateInfo) { + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); checkUpdateData(pInfo, true, pBlock, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -1845,7 +1847,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } - /*resetTableScanInfo(pTSInfo, pWin);*/ tsdbReaderClose(pTSInfo->base.dataReader); qDebug("4"); @@ -1891,8 +1892,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); printDataBlock(pSDB, "scan recover update"); @@ -1961,6 +1960,9 @@ FETCH_NEXT_BLOCK: pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; pBlock->info.dataLoad = 1; + if (pInfo->pUpdateInfo) { + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); + } blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -2058,8 +2060,6 @@ FETCH_NEXT_BLOCK: SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); printDataBlock(pSDB, "stream scan update"); @@ -2125,13 +2125,6 @@ FETCH_NEXT_BLOCK: setBlockIntoRes(pInfo, &block, false); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId, - pInfo->pRes->info.version)) { - printDataBlock(pInfo->pRes, "stream scan ignore"); - blockDataCleanup(pInfo->pRes); - continue; - } - if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return pInfo->pCreateTbRes; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 85b176c684..4ead4c49b5 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1066,6 +1066,7 @@ void streamStateDestroy(SStreamState* pState) { #ifdef USE_ROCKSDB streamFileStateDestroy(pState->pFileState); streamStateDestroy_rocksdb(pState); + taosMemoryFreeClear(pState->parNameMap); // do nothong #endif taosMemoryFreeClear(pState->pTdbState); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index be12c72d00..70a1c543f6 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pCloseWinSBF = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); - pInfo->maxVersion = 0; - pInfo->scanGroupId = 0; - pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pInfo->maxDataVersion = 0; return pInfo; } @@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { return true; } -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { - qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - pInfo->scanWindow = *pWin; - pInfo->scanGroupId = groupId; - pInfo->maxVersion = version; -} - -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { - if (!pInfo) { - return false; - } - qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey && - version <= pInfo->maxVersion) { - qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - return true; - } - return false; -} - void updateInfoDestroy(SUpdateInfo *pInfo) { if (pInfo == NULL) { return; @@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; } - if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; - if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1; tEndEncode(&encoder); @@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); - - if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1; - if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1; + if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; tEndDecode(&decoder); From cb0b4ffbd3bca71a6e92830bfdaac06e62204f22 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 14 Apr 2023 09:38:35 +0800 Subject: [PATCH 2/3] use interval state --- source/libs/executor/inc/executorimpl.h | 7 ++-- source/libs/executor/src/scanoperator.c | 34 ++----------------- source/libs/executor/src/timewindowoperator.c | 22 ++++++------ 3 files changed, 18 insertions(+), 45 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0adabed626..f503f9370b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -483,9 +483,10 @@ typedef struct SStreamScanInfo { int32_t blockRecoverTotCnt; SSDataBlock* pRecoverRes; - SSDataBlock* pCreateTbRes; - int8_t igCheckUpdate; - int8_t igExpired; + SSDataBlock* pCreateTbRes; + int8_t igCheckUpdate; + int8_t igExpired; + SStreamState* pState; } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4b4cae1a55..fbc835beea 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1447,39 +1447,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -#if 0 -void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { - SExprSupp* pTagCalSup = &pInfo->tagCalSup; - SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; - if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return; - if (pBlock == NULL || pBlock->info.rows == 0) return; - - void* tag = NULL; - int32_t tagLen = 0; - if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) { - pBlock->info.tagLen = tagLen; - void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen); - if (pTag == NULL) { - tdbFree(tag); - taosMemoryFree(pBlock->info.pTag); - pBlock->info.pTag = NULL; - pBlock->info.tagLen = 0; - return; - } - pBlock->info.pTag = pTag; - memcpy(pBlock->info.pTag, tag, tagLen); - tdbFree(tag); - return; - } else { - pBlock->info.pTag = NULL; - } - tdbFree(tag); -} -#endif - static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; - SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; blockDataCleanup(pInfo->pCreateTbRes); if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { pBlock->info.parTbName[0] = 0; @@ -1535,7 +1504,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedStreamWindow(&win, pBlock->info.id.groupId, - pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); + pInfo->pState, &pInfo->twAggSup); if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; @@ -2534,6 +2503,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; + pInfo->pState = NULL; // todo(liuyao) get buff from rocks db; void* buff = NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 89bce58cce..8034b8f871 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -25,7 +25,9 @@ #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +// #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); + +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL); typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; @@ -1612,20 +1614,20 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, - STimeWindowAggSupp* pTwSup) { +void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup); + initIntervalDownStream(downstream->pDownstream[0], type, pInfo); return; } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; - pScanInfo->windowSup.pIntervalAggSup = pSup; + pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark); + pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); } - pScanInfo->interval = *pInterval; - pScanInfo->twAggSup = *pTwSup; + pScanInfo->interval = pInfo->interval; + pScanInfo->twAggSup = pInfo->twAggSup; + pScanInfo->pState = pInfo->pState; } void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { @@ -2761,7 +2763,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); + initIntervalDownStream(downstream, pPhyNode->type, pInfo); } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4930,7 +4932,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); + initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; From 4906855a8b3ba77e004c9e251c35f9d1eeb0199e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 14 Apr 2023 09:47:56 +0800 Subject: [PATCH 3/3] delete mark --- source/libs/executor/src/timewindowoperator.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8034b8f871..f85ec1fb2f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -25,9 +25,8 @@ #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) -// #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL); typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo;