fix(stream):check scan range of stream operator

This commit is contained in:
54liuyao 2024-11-05 10:37:39 +08:00
parent bd41437ad2
commit 35a1d79b98
1 changed files with 18 additions and 0 deletions

View File

@ -2068,6 +2068,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
@ -2128,6 +2129,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
continue;
}
SSessionKey checkKey = {.groupId = groupId, .win.skey = startWin.win.skey, .win.ekey = endWin.win.ekey};
if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) {
continue;
}
tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0);
code = colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -2238,6 +2246,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -2297,6 +2306,12 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
code = colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
QUERY_CHECK_CODE(code, lino, _end);
SSessionKey checkKey = {.groupId = groupId, .win = win};
if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) {
continue;
}
tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0);
code = colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
QUERY_CHECK_CODE(code, lino, _end);
@ -2310,6 +2325,9 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
}
_end:
if (pScanRange != NULL) {
tSimpleHashCleanup(pScanRange);
}
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}