fix(stream): mark the timer launched by which checkpoint procedure.

This commit is contained in:
Haojun Liao 2024-07-15 14:23:33 +08:00
parent 142f9132a5
commit c4cde6f268
2 changed files with 16 additions and 7 deletions

View File

@ -63,11 +63,7 @@ struct SActiveCheckpointInfo {
tmr_h pChkptTriggerTmr;
int32_t sendReadyCheckCounter;
tmr_h pSendReadyMsgTmr;
};
struct SConsensusCheckpoint {
int8_t inProcess;
int64_t sendReadyTmrChkptId;
};
typedef struct {
@ -227,7 +223,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(const char* id, char* path);
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask);

View File

@ -815,6 +815,16 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
if (pActiveInfo->sendReadyTmrChkptId < pActiveInfo->activeId) {
taosThreadMutexUnlock(&pActiveInfo->lock);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stWarn("s-task:%s vgId:%d tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d",
id, vgId, pActiveInfo->sendReadyTmrChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) {
taosThreadMutexUnlock(&pActiveInfo->lock);
@ -902,7 +912,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
pInfo->upstreamTaskId);
}
taosThreadMutexUnlock(&pActiveInfo->lock);
stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num);
// start to check if checkpoint ready msg has successfully received by upstream tasks.
@ -916,8 +925,12 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
} else {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
}
// mark the timer monitor checkpointId
pActiveInfo->sendReadyTmrChkptId = pActiveInfo->activeId;
}
taosThreadMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_SUCCESS;
}