diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a1f632d8c2..b643e7186c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -321,7 +321,25 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return complete == 1 ? 0 : -1; } -int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { +int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + // impl later + int32_t code = 0; + if (taosIsDir(chkpPath)) { + taosRemoveDir(chkpPath); + } + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); + } + + code = downloadCheckpoint(key, chkpPath); + if (code != 0) { + return code; + } + code = copyFiles(chkpPath, defaultPath); + + return code; +} +int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = downloadCheckpoint(key, chkpPath); if (code != 0) { return code; @@ -355,6 +373,15 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d taosMemoryFree(tmp); return code; } +int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + UPLOAD_TYPE type = getUploadType(); + if (type == UPLOAD_S3) { + return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath); + } else if (type == UPLOAD_RSYNC) { + return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath); + } + return -1; +} int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = -1; @@ -944,7 +971,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI sprintf(pChkpIdDir, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId); if (taosIsDir(pChkpIdDir)) { stInfo("stream rm exist checkpoint%s", pChkpIdDir); - taosRemoveFile(pChkpIdDir); + taosRemoveDir(pChkpIdDir); } *chkpDir = pChkpDir; *chkpIdDir = pChkpIdDir; @@ -1848,22 +1875,17 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { return -1; } - char* pChkpDir = NULL; - char* pChkpIdDir = NULL; - if (chkpPreBuildDir(pDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { - code = -1; - } - if (taosIsDir(pChkpIdDir) && isValidCheckpoint(pChkpIdDir)) { + char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128); + sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + if (taosIsDir(buf)) { code = 0; - *path = pChkpIdDir; - pChkpIdDir = NULL; + *path = buf; + } else { + taosMemoryFree(buf); } - taosMemoryFree(pChkpDir); - taosMemoryFree(pChkpIdDir); taosReleaseRef(taskDbWrapperId, refId); - return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 74c391f386..a96fbca50b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -375,17 +375,16 @@ int32_t doUploadChkp(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; int32_t code = 0; - SArray* list = taosArrayInit(4, sizeof(void*)); + SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); - if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, - (int8_t)(arg->type), &path, list)) != 0) { + (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } - - code = getChkpMeta(arg->taskId, path, list); - if (code != 0) { - code = 0; + if (arg->type == UPLOAD_S3) { + if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { + stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); + } } if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) { @@ -393,23 +392,24 @@ int32_t doUploadChkp(void* param) { } if (code == 0) { - for (int i = 0; i < taosArrayGetSize(list); i++) { - char* p = taosArrayGetP(list, i); + for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { + char* p = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(arg->taskId, p); - stDebug("try to del file: %s", p); + stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p); if (code != 0) { break; } } } - taosArrayDestroyP(list, taosMemoryFree); + taosArrayDestroyP(toDelFiles, taosMemoryFree); taosRemoveDir(path); taosMemoryFree(path); + taosMemoryFree(arg->taskId); taosMemoryFree(arg); - return 0; + return code; } int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { // async upload