Merge pull request #28641 from taosdata/fix/TD-32815

fix(stream):check scan range of stream operator
This commit is contained in:
Shengliang Guan 2024-11-06 13:35:24 +08:00 committed by GitHub
commit 6ff8de1d22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 19 additions and 0 deletions

View File

@ -2068,6 +2068,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
@ -2128,6 +2129,14 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
continue;
}
SSessionKey checkKey = {.groupId = groupId, .win.skey = startWin.win.skey, .win.ekey = endWin.win.ekey};
if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) {
continue;
}
code = tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
QUERY_CHECK_CODE(code, lino, _end);
@ -2144,6 +2153,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
}
_end:
tSimpleHashCleanup(pScanRange);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
@ -2238,6 +2248,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SSHashObj* pScanRange = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -2297,6 +2308,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
code = colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
QUERY_CHECK_CODE(code, lino, _end);
SSessionKey checkKey = {.groupId = groupId, .win = win};
if (tSimpleHashGet(pScanRange, &checkKey, sizeof(SSessionKey)) != NULL) {
continue;
}
code = tSimpleHashPut(pScanRange, &checkKey, sizeof(SSessionKey), NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
QUERY_CHECK_CODE(code, lino, _end);
@ -2310,6 +2328,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
}
_end:
tSimpleHashCleanup(pScanRange);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}