diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 95846087d0..cda4084ec9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2068,6 +2068,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } + SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; @@ -2128,6 +2129,14 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]); continue; } + + SSessionKey checkKey = {.groupId = groupId, .win.skey = startWin.win.skey, .win.ekey = endWin.win.ekey}; + if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) { + continue; + } + code = tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + QUERY_CHECK_CODE(code, lino, _end); + code = colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false); QUERY_CHECK_CODE(code, lino, _end); @@ -2144,6 +2153,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } _end: + tSimpleHashCleanup(pScanRange); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } @@ -2238,6 +2248,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } + SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -2297,6 +2308,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS code = colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false); QUERY_CHECK_CODE(code, lino, _end); + SSessionKey checkKey = {.groupId = groupId, .win = win}; + if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) { + continue; + } + code = tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0); + QUERY_CHECK_CODE(code, lino, _end); + code = colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false); QUERY_CHECK_CODE(code, lino, _end); @@ -2310,6 +2328,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS } _end: + tSimpleHashCleanup(pScanRange); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); }