Merge pull request #29848 from taosdata/fix/ly_stream

fix(tsma):drop invalid state for tsma
This commit is contained in:
Shengliang Guan 2025-02-20 15:04:03 +08:00 committed by GitHub
commit 42bafc0d47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 69 additions and 2 deletions

View File

@ -361,6 +361,7 @@ typedef struct SStateStore {
bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key); bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
void (*streamStateDel)(SStreamState* pState, const SWinKey* key); void (*streamStateDel)(SStreamState* pState, const SWinKey* key);
void (*streamStateDelByGroupId)(SStreamState* pState, uint64_t groupId);
void (*streamStateClear)(SStreamState* pState); void (*streamStateClear)(SStreamState* pState);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);

View File

@ -44,6 +44,7 @@ int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, in
bool streamStateCheck(SStreamState* pState, const SWinKey* key); bool streamStateCheck(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
void streamStateDel(SStreamState* pState, const SWinKey* key); void streamStateDel(SStreamState* pState, const SWinKey* key);
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId);
void streamStateClear(SStreamState* pState); void streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);

View File

@ -59,6 +59,7 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode); int32_t* pWinCode);
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);

View File

@ -43,6 +43,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCheck = streamStateCheck; pStore->streamStateCheck = streamStateCheck;
pStore->streamStateGetByPos = streamStateGetByPos; pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStateDel = streamStateDel; pStore->streamStateDel = streamStateDel;
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
pStore->streamStateClear = streamStateClear; pStore->streamStateClear = streamStateClear;
pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateSaveInfo = streamStateSaveInfo;
pStore->streamStateGetInfo = streamStateGetInfo; pStore->streamStateGetInfo = streamStateGetInfo;

View File

@ -166,6 +166,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCheck = streamStateCheck; pStore->streamStateCheck = streamStateCheck;
pStore->streamStateGetByPos = streamStateGetByPos; pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStateDel = streamStateDel; pStore->streamStateDel = streamStateDel;
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
pStore->streamStateClear = streamStateClear; pStore->streamStateClear = streamStateClear;
pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateSaveInfo = streamStateSaveInfo;
pStore->streamStateGetInfo = streamStateGetInfo; pStore->streamStateGetInfo = streamStateGetInfo;

View File

@ -3818,7 +3818,11 @@ FETCH_NEXT_BLOCK:
int32_t deleteNum = 0; int32_t deleteNum = 0;
code = deletePartName(pInfo, pBlock, &deleteNum); code = deletePartName(pInfo, pBlock, &deleteNum);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (deleteNum == 0) goto FETCH_NEXT_BLOCK; if (deleteNum == 0) {
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo));
qDebug("===stream=== ignore block type 18, delete num is 0");
goto FETCH_NEXT_BLOCK;
}
} break; } break;
case STREAM_CHECKPOINT: { case STREAM_CHECKPOINT: {
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");

View File

@ -237,6 +237,29 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doDeleteWindowByGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* pGroupIdData = (uint64_t*)pGpIdCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t groupId = pGroupIdData[i];
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
size_t keyLen = 0;
SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
if (pKey->groupId == groupId) {
int32_t tmpRes = tSimpleHashIterateRemove(pInfo->aggSup.pResultRowHashTable, pKey, keyLen, &pIte, &iter);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
pAPI->stateStore.streamStateDelByGroupId(pInfo->pState, groupId);
}
}
static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -5232,7 +5255,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_DROP_CHILD_TABLE) {
doDeleteWindowByGroupId(pOperator, pBlock);
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pBlock; (*ppRes) = pBlock;
return code; return code;

View File

@ -224,6 +224,10 @@ void streamStateDel(SStreamState* pState, const SWinKey* key) {
deleteRowBuff(pState->pFileState, key, sizeof(SWinKey)); deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
} }
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId) {
deleteRowBuffByGroupId(pState->pFileState, groupId);
}
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return streamStateFillPut_rocksdb(pState, key, value, vLen); return streamStateFillPut_rocksdb(pState, key, value, vLen);
} }

View File

@ -667,6 +667,32 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
} }
} }
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
SSHashObj* pRowMap = pFileState->rowStateBuff;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
size_t keyLen = 0;
SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
if (pKey->groupId == groupId) {
int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
while (1) {
SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
SWinKey delKey = {.groupId = groupId};
int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
break;
}
code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
qTrace("%s at line %d res:%d", __func__, __LINE__, code);
}
}
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;