From 35a1d79b9829df25847092f7025cccde43a99e2c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 5 Nov 2024 10:37:39 +0800 Subject: [PATCH 1/3] fix(stream):check scan range of stream operator --- source/libs/executor/src/scanoperator.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 95846087d0..e75a9d59a4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2068,6 +2068,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } + SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; @@ -2128,6 +2129,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]); continue; } + + SSessionKey checkKey = {.groupId = groupId, .win.skey = startWin.win.skey, .win.ekey = endWin.win.ekey}; + if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) { + continue; + } + tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + code = colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false); QUERY_CHECK_CODE(code, lino, _end); @@ -2238,6 +2246,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } + SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -2297,6 +2306,12 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS code = colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false); QUERY_CHECK_CODE(code, lino, _end); + SSessionKey checkKey = {.groupId = groupId, .win = win}; + if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) { + continue; + } + tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + code = colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); QUERY_CHECK_CODE(code, lino, _end); @@ -2310,6 +2325,9 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS } _end: + if (pScanRange != NULL) { + tSimpleHashCleanup(pScanRange); + } if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } From ecf8f92ac833f8379f021d61e16680480d076f82 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 5 Nov 2024 11:11:23 +0800 Subject: [PATCH 2/3] fix ci issue --- source/libs/executor/src/scanoperator.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e75a9d59a4..5135c368b3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2134,7 +2134,8 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) { continue; } - tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + code = tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + QUERY_CHECK_CODE(code, lino, _end); code = colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false); QUERY_CHECK_CODE(code, lino, _end); @@ -2310,7 +2311,8 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) { continue; } - tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + code = tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + QUERY_CHECK_CODE(code, lino, _end); code = colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); QUERY_CHECK_CODE(code, lino, _end); From d010323eb6f3cb6344fa14ce3a5a0d51af3ab779 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 5 Nov 2024 13:15:32 +0800 Subject: [PATCH 3/3] fix ci issue --- source/libs/executor/src/scanoperator.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5135c368b3..cda4084ec9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2153,6 +2153,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } _end: + tSimpleHashCleanup(pScanRange); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } @@ -2327,9 +2328,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS } _end: - if (pScanRange != NULL) { - tSimpleHashCleanup(pScanRange); - } + tSimpleHashCleanup(pScanRange); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); }