From 870fe1c071d3b9a8e6eb6901677945ef2944834c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 20 Feb 2025 11:52:04 +0800 Subject: [PATCH] fix(tsma):drop invalid state for tsma --- include/libs/executor/storageapi.h | 1 + include/libs/stream/streamState.h | 1 + include/libs/stream/tstreamFileState.h | 1 + source/dnode/snode/src/snodeInitApi.c | 1 + source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/libs/executor/src/scanoperator.c | 6 +++- .../executor/src/streamtimewindowoperator.c | 30 ++++++++++++++++++- source/libs/stream/src/streamState.c | 4 +++ source/libs/stream/src/tstreamFileState.c | 26 ++++++++++++++++ 9 files changed, 69 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 3cc2acf30f..3ed5d82f98 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -361,6 +361,7 @@ typedef struct SStateStore { bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); void (*streamStateDel)(SStreamState* pState, const SWinKey* key); + void (*streamStateDelByGroupId)(SStreamState* pState, uint64_t groupId); void (*streamStateClear)(SStreamState* pState); void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b4e0087b1a..bab1d9438c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -44,6 +44,7 @@ int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, in bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); void streamStateDel(SStreamState* pState, const SWinKey* key); +void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId); void streamStateClear(SStreamState* pState); void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index f47c308e18..7463d4d130 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -59,6 +59,7 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t* pWinCode); void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); +void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 68dc981338..a35baa0092 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -43,6 +43,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCheck = streamStateCheck; pStore->streamStateGetByPos = streamStateGetByPos; pStore->streamStateDel = streamStateDel; + pStore->streamStateDelByGroupId = streamStateDelByGroupId; pStore->streamStateClear = streamStateClear; pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateGetInfo = streamStateGetInfo; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index b8682028cf..0b6c13d2db 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -166,6 +166,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCheck = streamStateCheck; pStore->streamStateGetByPos = streamStateGetByPos; pStore->streamStateDel = streamStateDel; + pStore->streamStateDelByGroupId = streamStateDelByGroupId; pStore->streamStateClear = streamStateClear; pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateGetInfo = streamStateGetInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1060cbcffe..ce2a5019a6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3818,7 +3818,11 @@ FETCH_NEXT_BLOCK: int32_t deleteNum = 0; code = deletePartName(pInfo, pBlock, &deleteNum); QUERY_CHECK_CODE(code, lino, _end); - if (deleteNum == 0) goto FETCH_NEXT_BLOCK; + if (deleteNum == 0) { + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo)); + qDebug("===stream=== ignore block type 18, delete num is 0"); + goto FETCH_NEXT_BLOCK; + } } break; case STREAM_CHECKPOINT: { qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 031d2e8bdc..5cab26b9d3 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -237,6 +237,29 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } +static void doDeleteWindowByGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + + SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* pGroupIdData = (uint64_t*)pGpIdCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + uint64_t groupId = pGroupIdData[i]; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { + size_t keyLen = 0; + SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + if (pKey->groupId == groupId) { + int32_t tmpRes = tSimpleHashIterateRemove(pInfo->aggSup.pResultRowHashTable, pKey, keyLen, &pIte, &iter); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); + } + } + + pAPI->stateStore.streamStateDelByGroupId(pInfo->pState, groupId); + } +} + static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { int32_t code = TSDB_CODE_SUCCESS; @@ -5232,7 +5255,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); continue; - } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) { + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pBlock; + return code; + } else if (pBlock->info.type == STREAM_DROP_CHILD_TABLE) { + doDeleteWindowByGroupId(pOperator, pBlock); printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); (*ppRes) = pBlock; return code; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 7259c0e49a..621be05e84 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -224,6 +224,10 @@ void streamStateDel(SStreamState* pState, const SWinKey* key) { deleteRowBuff(pState->pFileState, key, sizeof(SWinKey)); } +void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId) { + deleteRowBuffByGroupId(pState->pFileState, groupId); +} + int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { return streamStateFillPut_rocksdb(pState, key, value, vLen); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index aaff58d1b4..d6dfde1ee6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -667,6 +667,32 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe } } +void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) { + SSHashObj* pRowMap = pFileState->rowStateBuff; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) { + size_t keyLen = 0; + SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + if (pKey->groupId == groupId) { + int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); + } + } + + while (1) { + SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId}; + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp); + SWinKey delKey = {.groupId = groupId}; + int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0); + if (code != TSDB_CODE_SUCCESS) { + break; + } + code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey); + qTrace("%s at line %d res:%d", __func__, __LINE__, code); + } +} + static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0;