fix(stream): set the failed id before clear the checkpoint info.
This commit is contained in:
parent
1ffec769b8
commit
5585a141d5
|
@ -707,11 +707,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
} else { // clear the checkpoint info if failed
|
} else { // clear the checkpoint info if failed
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
streamTaskClearCheckInfo(pTask, false);
|
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
|
||||||
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
streamTaskSetFailedCheckpointId(pTask);
|
|
||||||
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1063,6 +1063,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr,
|
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr,
|
||||||
(*pTask)->chkInfo.pActiveInfo->transId);
|
(*pTask)->chkInfo.pActiveInfo->transId);
|
||||||
}
|
}
|
||||||
|
(*pTask)->chkInfo.pActiveInfo->failedId = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->exec.pWalReader != NULL) {
|
if ((*pTask)->exec.pWalReader != NULL) {
|
||||||
|
|
|
@ -1039,7 +1039,6 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
|
||||||
|
|
||||||
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
|
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
|
||||||
pInfo->activeId = 0; // clear the checkpoint id
|
pInfo->activeId = 0; // clear the checkpoint id
|
||||||
pInfo->failedId = 0;
|
|
||||||
pInfo->transId = 0;
|
pInfo->transId = 0;
|
||||||
pInfo->allUpstreamTriggerRecv = 0;
|
pInfo->allUpstreamTriggerRecv = 0;
|
||||||
pInfo->dispatchTrigger = false;
|
pInfo->dispatchTrigger = false;
|
||||||
|
|
Loading…
Reference in New Issue