fix ci bug

This commit is contained in:
liuyao 2023-07-10 13:45:18 +08:00
parent f4df89f755
commit ee196a88e6
2 changed files with 8 additions and 0 deletions

View File

@ -1771,6 +1771,7 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
void* pBuf = NULL;
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
taosMemoryFree(pBuf);
}
// other properties are recovered from the execution plan

View File

@ -2673,6 +2673,7 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
len = doStreamIntervalEncodeOpState(&pBuf, pOperator);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
@ -4675,6 +4676,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
}