fix(stream):synchronized upload checkpoint data to snode.

This commit is contained in:
Haojun Liao 2024-06-21 09:11:16 +08:00
parent bf16c596a6
commit 8fd9baf6f5
1 changed files with 48 additions and 55 deletions

View File

@ -562,76 +562,67 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
return code;
}
int32_t uploadCheckpointData(void* param) {
SAsyncUploadArg* pParam = param;
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
char* path = NULL;
int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
char* taskStr = pParam->taskId ? pParam->taskId : "NULL";
int64_t now = taosGetTimestampMs();
SStreamMeta* pMeta = pTask->pMeta;
const char* idStr = pTask->id.idStr;
void* pBackend = taskAcquireDb(pParam->dbRefId);
if (pBackend == NULL) {
stError("s-task:%s failed to acquire db", taskStr);
taosMemoryFree(pParam->taskId);
taosMemoryFree(pParam);
return -1;
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
pTask->id.idStr)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId);
}
if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt,
pParam->chkpId, (int8_t)(pParam->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId);
}
if (pParam->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId);
if (type == DATA_UPLOAD_S3) {
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId);
}
}
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadCheckpoint(pParam->taskId, path);
code = streamTaskUploadCheckpoint(idStr, path);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId);
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
} else {
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path);
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path);
}
}
taskReleaseDb(pParam->dbRefId);
if (code == 0) {
if (code == TSDB_CODE_SUCCESS) {
int32_t size = taosArrayGetSize(toDelFiles);
stDebug("s-task:%s remove redundant %d files", taskStr, size);
stDebug("s-task:%s remove redundant %d files", idStr, size);
for (int i = 0; i < size; i++) {
char* pName = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(pParam->taskId, pName);
code = deleteCheckpointFile(idStr, pName);
if (code != 0) {
stDebug("s-task:%s failed to del file: %s", taskStr, pName);
stDebug("s-task:%s failed to remove file: %s", idStr, pName);
break;
}
}
stDebug("s-task:%s remove redundant files done", taskStr);
stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
}
taosArrayDestroyP(toDelFiles, taosMemoryFree);
double el = (taosGetTimestampMs() - now) / 1000.0;
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path);
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
idStr, checkpointId, el, path);
taosRemoveDir(path);
} else {
stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId);
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs",
idStr, checkpointId, el);
}
taosMemoryFree(path);
taosMemoryFree(pParam->taskId);
taosMemoryFree(pParam);
return code;
}
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) {
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) {
return 0;
@ -641,15 +632,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
return 0;
}
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type;
arg->taskId = taosStrdup(taskId);
arg->chkpId = checkpointId;
arg->pTask = pTask;
arg->dbRefId = taskGetDBRef(pTask->pBackend);
arg->pMeta = pTask->pMeta;
int64_t dbRefId = taskGetDBRef(pTask->pBackend);
void* pBackend = taskAcquireDb(dbRefId);
if (pBackend == NULL) {
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr);
return -1;
}
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
taskReleaseDb(dbRefId);
return code;
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
@ -670,6 +663,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
}
// TODO: monitoring the checkpoint-source msg
// send check point response to upstream task
if (code == TSDB_CODE_SUCCESS) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -679,27 +673,26 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
// todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
tstrerror(code));
}
}
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
}
} else {
stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId);
}
// TODO: monitoring the checkpoint-report msg
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
}
} else {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
}
}
// clear the checkpoint info if failed
if (code != TSDB_CODE_SUCCESS) {
} else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
@ -710,7 +703,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
double el = (taosGetTimestampMs() - startTs) / 1000.0;
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id,
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");