diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 60738b96bf..a01fabd3a1 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -319,6 +319,11 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d return code; } + int32_t len = strlen(defaultPath) + 32; + char* tmp = taosMemoryCalloc(1, len); + sprintf(tmp, "%s%s%s", defaultPath, TD_DIRSEP, "_tmp"); + if (taosIsDir(tmp)) taosRemoveDir(tmp); + SArray* list = taosArrayInit(2, sizeof(void*)); code = remoteChkp_readMetaData(chkpPath, list); if (code == 0) { @@ -328,19 +333,31 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d if (code == 0) { if (taosIsDir(defaultPath)) { - taosRemoveDir(defaultPath); + taosRenameFile(defaultPath, tmp); } taosMkDir(defaultPath); code = copyFiles(chkpPath, defaultPath); } + + if (code != 0) { + if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath); + if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath); + } return code; } int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - int32_t code = 0; + int32_t code = -1; + int32_t len = strlen(defaultPath) + 32; + char* tmp = taosMemoryCalloc(1, len); + sprintf(tmp, "%s%s%s", defaultPath, TD_DIRSEP, "_tmp"); + if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { + if (taosIsDir(tmp)) { + taosRemoveDir(tmp); + } if (taosIsDir(defaultPath)) { - taosRemoveDir(defaultPath); + taosRenameFile(defaultPath, tmp); } taosMkDir(defaultPath); code = copyFiles(chkpPath, defaultPath); @@ -349,10 +366,14 @@ int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* de } else { stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); } - - return code; } - return -1; + if (code != 0) { + if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath); + if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath); + } + + taosMemoryFree(tmp); + return code; } int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {