fix(stream): record the checkpoint failure when nodeEp changed.

This commit is contained in:
Haojun Liao 2023-12-21 10:30:44 +08:00
parent 32ce4b6a4c
commit 8a8ddf5699
3 changed files with 18 additions and 14 deletions

View File

@ -116,6 +116,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
@ -137,17 +138,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
// <<<<<<< HEAD
// void streamClearChkptReadyMsg(SStreamTask* pTask);
// int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const
// char*); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo*
// pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
// void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// =======
// >>>>>>> 3.0
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);

View File

@ -341,9 +341,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
return code;
}
void streamTaskSetFailedId(SStreamTask* pTask) {
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
}
int32_t getChkpMeta(char* id, char* path, SArray* list) {
@ -485,7 +484,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
streamTaskSetFailedId(pTask);
streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr,
ckId);
}

View File

@ -313,9 +313,24 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
}
if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));