refactor:do some internal refactor.

This commit is contained in:
Haojun Liao 2023-11-08 16:57:55 +08:00
parent 470f244032
commit f949333e8b
1 changed files with 74 additions and 38 deletions

View File

@ -270,80 +270,116 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
SStreamMeta* pMeta = p->pMeta;
int32_t vgId = pMeta->vgId;
const char* id = p->id.idStr;
int32_t code = 0;
if (p->info.fillHistory == 1) {
return code;
}
streamMetaWLock(pMeta);
ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId);
taosThreadMutexLock(&p->lock);
ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId);
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
streamTaskClearCheckInfo(p);
char* str = NULL;
streamTaskGetStatus(p, &str);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&p->lock);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
streamMetaWUnLock(pMeta);
stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId);
return -1;
} else { // save the task
streamMetaSaveTask(pMeta, p);
}
stDebug(
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str);
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str);
code = streamMetaCommit(pMeta);
if (code < 0) {
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
checkpointId, terrstr());
} else {
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
// save the task if not sink task
if (p->info.taskLevel != TASK_LEVEL__SINK) {
streamMetaWLock(pMeta);
code = streamMetaSaveTask(pMeta, p);
if (code != TSDB_CODE_SUCCESS) {
streamMetaWUnLock(pMeta);
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
vgId, checkpointId, terrstr());
return code;
}
code = streamMetaCommit(pMeta);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s",
id, vgId, checkpointId, terrstr());
}
streamMetaWUnLock(pMeta);
}
streamMetaWUnLock(pMeta);
return code;
}
void streamTaskSetFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
// check for all tasks, and do generate the vnode-wide checkpoint data.
int64_t checkpointStartTs = pTask->chkInfo.startTs;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
// sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel);
streamBackendDoCheckpoint(pTask->pBackend, pTask->chkInfo.checkpointingId);
streamSaveTaskCheckpointInfo(pTask, pTask->chkInfo.checkpointingId);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno));
}
}
double el = (taosGetTimestampMs() - checkpointStartTs) / 1000.0;
stInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " save all tasks status, level:%d elapsed time:%.2f Sec ",
pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.checkpointingId, pTask->info.taskLevel, el);
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
if (code == TSDB_CODE_SUCCESS) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
ckId, tstrerror(code));
}
}
if (code != TSDB_CODE_SUCCESS) {
// record the failure checkpoint id
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
pTask->chkInfo.checkpointingId, tstrerror(code));
// clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully
if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId,
tstrerror(terrno));
}
}
if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
streamTaskSetFailedId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%"PRId64, pTask->id.idStr, ckId);
}
double el = (taosGetTimestampMs() - startTs) / 1000.0;
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ",
pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
return code;
}