From 2d48c2c9939efcfba5274b3fd91be4292b928072 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Apr 2024 10:41:01 +0800 Subject: [PATCH] fix(stream):add option for update data --- include/common/tcommon.h | 1 + source/libs/executor/src/scanoperator.c | 16 ++++++++-------- .../executor/src/streamcountwindowoperator.c | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0968c59399..2c4a00a72d 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -176,6 +176,7 @@ typedef enum EStreamType { STREAM_CREATE_CHILD_TABLE, STREAM_TRANS_STATE, STREAM_MID_RETRIEVE, + STREAM_PARTITION_DELETE_DATA, } EStreamType; #pragma pack(push, 1) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9dfe6230d9..c30059fffd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1640,7 +1640,7 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int printDataBlock(pBlock, "new delete", taskIdStr); } -static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { +static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } @@ -1659,7 +1659,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) { + if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; @@ -1736,7 +1736,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB } int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) { + if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; @@ -1779,7 +1779,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB return TSDB_CODE_SUCCESS; } -static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { +static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { blockDataCleanup(pDestBlock); if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; @@ -1800,7 +1800,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && (srcStartTsCol[0] != srcEndTsCol[0] || hasPrimaryKey(pInfo))) { + if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1959,9 +1959,9 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) { int32_t code = TSDB_CODE_SUCCESS; if (isIntervalWindow(pInfo)) { - code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock); + code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock, type); } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) { - code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock); + code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock, type); } else if (isCountWindow(pInfo)) { code = generateCountScanRange(pInfo, pSrcBlock, pDestBlock, type); } else { @@ -2660,7 +2660,7 @@ FETCH_NEXT_BLOCK: } } break; case STREAM_SCAN_FROM_DELETE_DATA: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_DELETE_DATA); + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index b66df120b4..f2d3bbb29a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -185,7 +185,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt return; } pDelRange->win = tmpKey.win; - while (mode == STREAM_DELETE_DATA) { + while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) { pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); if (code != TSDB_CODE_SUCCESS) {