fix(stream): check for checkpoint interrpution in sendReady monitor.

This commit is contained in:
Haojun Liao 2024-07-04 17:48:58 +08:00
parent c35c634977
commit e6defda0d0
2 changed files with 16 additions and 3 deletions

View File

@ -405,12 +405,14 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.startTs = 0; // clear the recorded start time
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock);
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
}
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {

View File

@ -813,9 +813,20 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
taosThreadMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) {
taosThreadMutexUnlock(&pActiveInfo->lock);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stWarn("s-task:0x%x vgId:%d active checkpoint may failed, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));
int32_t num = taosArrayGetSize(pList);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
for (int32_t i = 0; i < num; ++i) {