fix(stream): not clear task list if check failed, and add more checks.

This commit is contained in:
Haojun Liao 2025-01-21 10:27:39 +08:00
parent 7900c725d5
commit 08092aeb0c
1 changed files with 12 additions and 5 deletions

View File

@ -698,16 +698,23 @@ static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t nu
// cross-check failed, there must be something unknown wrong // cross-check failed, there must be something unknown wrong
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId)); SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
if (pTransInfo == NULL) { if (pTransInfo == NULL) {
mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId, transId); mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
taosArrayClear(pReportInfo->pTaskList); id.streamId, transId);
return -1;
if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
", req:%" PRId64 " recheck next time",
id.streamId, pe->checkpointInfo.activeId, checkpointId);
return -1;
} else {
// do nothing
}
} }
if (pTransInfo->transId != transId) { if (pTransInfo->transId != transId) {
mError("stream:0x%" PRIx64 mError("stream:0x%" PRIx64
" checkpoint-report list info are expired, clear and retry, active transId:%d trans in list:%d", " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
id.streamId, pTransInfo->transId, transId); id.streamId, pTransInfo->transId, transId);
taosArrayClear(pReportInfo->pTaskList);
return -1; return -1;
} }