fix(stream): clear the active checkpoint info after report to mnode.

This commit is contained in:
Haojun Liao 2024-06-28 23:40:05 +08:00
parent 5585a141d5
commit e74b8473f1
3 changed files with 12 additions and 9 deletions

View File

@ -708,7 +708,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
} else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
streamTaskClearCheckInfo(pTask, true);
taosThreadMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);

View File

@ -1053,17 +1053,20 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed =
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo;
if (p->activeId != 0) {
entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = p->activeId;
entry.checkpointInfo.activeTransId = p->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr,
(*pTask)->chkInfo.pActiveInfo->transId);
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo",
(*pTask)->id.idStr, p->transId);
taosThreadMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
taosThreadMutexUnlock(&(*pTask)->lock);
}
(*pTask)->chkInfo.pActiveInfo->failedId = 0;
}
if ((*pTask)->exec.pWalReader != NULL) {

View File

@ -1042,6 +1042,7 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);