Merge pull request #25432 from taosdata/fix/TD-29688

fix(stream):add option for update data
This commit is contained in:
Haojun Liao 2024-04-22 14:57:28 +08:00 committed by GitHub
commit b6316c2692
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 10 additions and 9 deletions

View File

@ -176,6 +176,7 @@ typedef enum EStreamType {
STREAM_CREATE_CHILD_TABLE,
STREAM_TRANS_STATE,
STREAM_MID_RETRIEVE,
STREAM_PARTITION_DELETE_DATA,
} EStreamType;
#pragma pack(push, 1)

View File

@ -1640,7 +1640,7 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int
printDataBlock(pBlock, "new delete", taskIdStr);
}
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
@ -1659,7 +1659,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
}
int64_t ver = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) {
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
startData = (TSKEY*)pStartTsCol->pData;
endData = (TSKEY*)pEndTsCol->pData;
@ -1736,7 +1736,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
}
int64_t ver = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) {
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
startData = (TSKEY*)pStartTsCol->pData;
endData = (TSKEY*)pEndTsCol->pData;
@ -1779,7 +1779,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
return TSDB_CODE_SUCCESS;
}
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
blockDataCleanup(pDestBlock);
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
@ -1800,7 +1800,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
int64_t ver = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && (srcStartTsCol[0] != srcEndTsCol[0] || hasPrimaryKey(pInfo))) {
if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
@ -1959,9 +1959,9 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) {
int32_t code = TSDB_CODE_SUCCESS;
if (isIntervalWindow(pInfo)) {
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock, type);
} else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock, type);
} else if (isCountWindow(pInfo)) {
code = generateCountScanRange(pInfo, pSrcBlock, pDestBlock, type);
} else {
@ -2660,7 +2660,7 @@ FETCH_NEXT_BLOCK:
}
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_DELETE_DATA);
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);

View File

@ -185,7 +185,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt
return;
}
pDelRange->win = tmpKey.win;
while (mode == STREAM_DELETE_DATA) {
while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {