From 8a2e431094cbe4648887756a32f00a36f2bd65ff Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 15 Jan 2024 09:03:56 +0000 Subject: [PATCH] add hard link --- source/libs/stream/src/streamBackendRocksdb.c | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9c51d57f92..8175b946cc 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -41,6 +41,8 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst); int32_t getCfIdx(const char* cfName); STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath); +int32_t backendCopyFiles(char* src, char* dst); + void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); const char* compactFilteFactoryName(void* arg); @@ -218,7 +220,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { taosRemoveDir(state); } taosMkDir(state); - code = copyFiles(chkp, state); + code = backendCopyFiles(chkp, state); stInfo("copy snap file from %s to %s", chkp, state); if (code != 0) { stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); @@ -334,7 +336,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c if (code != 0) { return code; } - code = copyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); return code; } @@ -359,7 +361,7 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char if (code == 0) { taosMkDir(defaultPath); - code = copyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); } if (code != 0) { @@ -384,7 +386,13 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d int32_t copyFiles_create(char* src, char* dst, int8_t type) { // create and copy file - return taosCopyFile(src, dst); + int32_t err = taosCopyFile(src, dst); + + if (errno == EXDEV || errno == ENOTSUP) { + errno = 0; + return 0; + } + return 0; } int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) { // same fs and hard link @@ -392,6 +400,9 @@ int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) { } int32_t backendFileCopyFilesImpl(char* src, char* dst) { + const char* current = "CURRENT"; + size_t currLen = strlen(current); + int32_t code = 0; int32_t sLen = strlen(src); int32_t dLen = strlen(dst); @@ -403,8 +414,10 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) { if (pDir == NULL) { taosMemoryFree(srcName); taosMemoryFree(dstName); + errno = 0; return -1; } + TdDirEntryPtr de = NULL; while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); @@ -413,25 +426,33 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) { sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); - if (strcmp(name, "CURRENT") == 0) { + if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) { code = copyFiles_create(srcName, dstName, 0); + if (code != 0) { + stError("failed to copy file, detail: %s to %s reason: %s", srcName, dstName, + tstrerror(TAOS_SYSTEM_ERROR(code))); + goto _ERROR; + } } else { code = copyFiles_hardlink(srcName, dstName, 0); - } - if (code != 0) { - stError("failed to copy file, reason: %s", tstrerror(TAOS_SYSTEM_ERROR(code))); - goto _ERROR; + if (code != 0) { + stError("failed to hard line file, detail: %s to %s, reason: %s", srcName, dstName, + tstrerror(TAOS_SYSTEM_ERROR(code))); + goto _ERROR; + } } memset(srcName, 0, sLen + 64); memset(dstName, 0, dLen + 64); } taosCloseDir(&pDir); + errno = 0; return 0; _ERROR: taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); taosCloseDir(&pDir); + errno = 0; return -1; } int32_t backendCopyFiles(char* src, char* dst) {