refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-15 09:21:31 +08:00
parent dc1a0821a7
commit ce855bc493
1 changed files with 22 additions and 25 deletions

View File

@ -294,11 +294,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
int32_t code = 0;
SCheckpointInfo* pCKInfo = &p->chkInfo;
if (p->info.fillHistory == 1) {
return code;
}
if (p->info.taskLevel > TASK_LEVEL__SINK) {
// fill-history task, rsma task, and sink task will not generate the checkpoint
if ((p->info.fillHistory == 1) || (p->info.taskLevel >= TASK_LEVEL__SINK)) {
return code;
}
@ -333,7 +330,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name);
// save the task if not sink task
if (p->info.taskLevel != TASK_LEVEL__SINK) {
if (p->info.taskLevel < TASK_LEVEL__SINK) {
streamMetaWLock(pMeta);
code = streamMetaSaveTask(pMeta, p);
@ -454,13 +451,14 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
const char* id = pTask->id.idStr;
// sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel);
stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno));
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
}
}
@ -474,39 +472,38 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
ckId, tstrerror(code));
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
tstrerror(code));
}
}
// clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully
if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadChkp(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId,
tstrerror(code));
} else {
code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr);
if (code != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId);
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
}
} else {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
}
}
if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed
// clear the checkpoint info if failed
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr,
ckId);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
}
double el = (taosGetTimestampMs() - startTs) / 1000.0;
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ",
pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id,
pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
return code;