fix(tsma):drop invalid state for tsma

This commit is contained in:
54liuyao 2025-02-20 11:52:04 +08:00
parent 1c5e545337
commit 870fe1c071
9 changed files with 69 additions and 2 deletions

View File

@ -361,6 +361,7 @@ typedef struct SStateStore {
bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
void (*streamStateDel)(SStreamState* pState, const SWinKey* key);
void (*streamStateDelByGroupId)(SStreamState* pState, uint64_t groupId);
void (*streamStateClear)(SStreamState* pState);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);

View File

@ -44,6 +44,7 @@ int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, in
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
void streamStateDel(SStreamState* pState, const SWinKey* key);
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId);
void streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);

View File

@ -59,6 +59,7 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);

View File

@ -43,6 +43,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCheck = streamStateCheck;
pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStateDel = streamStateDel;
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
pStore->streamStateClear = streamStateClear;
pStore->streamStateSaveInfo = streamStateSaveInfo;
pStore->streamStateGetInfo = streamStateGetInfo;

View File

@ -166,6 +166,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCheck = streamStateCheck;
pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStateDel = streamStateDel;
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
pStore->streamStateClear = streamStateClear;
pStore->streamStateSaveInfo = streamStateSaveInfo;
pStore->streamStateGetInfo = streamStateGetInfo;

View File

@ -3818,7 +3818,11 @@ FETCH_NEXT_BLOCK:
int32_t deleteNum = 0;
code = deletePartName(pInfo, pBlock, &deleteNum);
QUERY_CHECK_CODE(code, lino, _end);
if (deleteNum == 0) goto FETCH_NEXT_BLOCK;
if (deleteNum == 0) {
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo));
qDebug("===stream=== ignore block type 18, delete num is 0");
goto FETCH_NEXT_BLOCK;
}
} break;
case STREAM_CHECKPOINT: {
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");

View File

@ -237,6 +237,29 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doDeleteWindowByGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* pGroupIdData = (uint64_t*)pGpIdCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t groupId = pGroupIdData[i];
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
size_t keyLen = 0;
SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
if (pKey->groupId == groupId) {
int32_t tmpRes = tSimpleHashIterateRemove(pInfo->aggSup.pResultRowHashTable, pKey, keyLen, &pIte, &iter);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
pAPI->stateStore.streamStateDelByGroupId(pInfo->pState, groupId);
}
}
static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) {
int32_t code = TSDB_CODE_SUCCESS;
@ -5232,7 +5255,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_DROP_CHILD_TABLE) {
doDeleteWindowByGroupId(pOperator, pBlock);
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pBlock;
return code;

View File

@ -224,6 +224,10 @@ void streamStateDel(SStreamState* pState, const SWinKey* key) {
deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
}
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId) {
deleteRowBuffByGroupId(pState->pFileState, groupId);
}
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return streamStateFillPut_rocksdb(pState, key, value, vLen);
}

View File

@ -667,6 +667,32 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
}
}
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
SSHashObj* pRowMap = pFileState->rowStateBuff;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
size_t keyLen = 0;
SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
if (pKey->groupId == groupId) {
int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
while (1) {
SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
SWinKey delKey = {.groupId = groupId};
int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
break;
}
code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
qTrace("%s at line %d res:%d", __func__, __LINE__, code);
}
}
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;