diff --git a/include/os/osFile.h b/include/os/osFile.h index 503535a454..4d760a791f 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -117,6 +117,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName); int32_t taosSetFileHandlesLimit(); +int32_t taosLinkFile(char *src, char *dst); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4b2a9e4a65..6b1bd759da 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -382,6 +382,99 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d return -1; } +int32_t copyFiles_create(char* src, char* dst, int8_t type) { + // create and copy file + return taosCopyFile(src, dst); +} +int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) { + // same fs and hard link + return taosLinkFile(src, dst); +} + +int32_t backendFileCopyFilesImpl(char* src, char* dst) { + int32_t code = 0; + int32_t sLen = strlen(src); + int32_t dLen = strlen(dst); + char* srcName = taosMemoryCalloc(1, sLen + 64); + char* dstName = taosMemoryCalloc(1, dLen + 64); + // copy file to dst + + TdDirPtr pDir = taosOpenDir(src); + if (pDir == NULL) { + taosMemoryFree(srcName); + taosMemoryFree(dstName); + return -1; + } + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + + if (strcmp(name, "CURRENT") == 0) { + code = copyFiles_create(srcName, dstName, 0); + } else { + code = copyFiles_hardlink(srcName, dstName, 0); + } + if (code != 0) { + goto _ERROR; + } + memset(srcName, 0, sLen + 64); + memset(dstName, 0, dLen + 64); + } + + taosCloseDir(&pDir); + return 0; +_ERROR: + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); + taosCloseDir(&pDir); + return -1; +} +int32_t backendCopyFiles(char* src, char* dst) { + return backendFileCopyFilesImpl(src, dst); + // // opt later, just hard link + // int32_t sLen = strlen(src); + // int32_t dLen = strlen(dst); + // char* srcName = taosMemoryCalloc(1, sLen + 64); + // char* dstName = taosMemoryCalloc(1, dLen + 64); + + // TdDirPtr pDir = taosOpenDir(src); + // if (pDir == NULL) { + // taosMemoryFree(srcName); + // taosMemoryFree(dstName); + // return -1; + // } + + // TdDirEntryPtr de = NULL; + // while ((de = taosReadDir(pDir)) != NULL) { + // char* name = taosGetDirEntryName(de); + // if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + // sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + // sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + // // if (!taosDirEntryIsDir(de)) { + // // // code = taosCopyFile(srcName, dstName); + // // if (code == -1) { + // // goto _err; + // // } + // // } + // return backendFileCopyFilesImpl(src, dst); + + // memset(srcName, 0, sLen + 64); + // memset(dstName, 0, dLen + 64); + // } + + // _err: + // taosMemoryFreeClear(srcName); + // taosMemoryFreeClear(dstName); + // taosCloseDir(&pDir); + // return code >= 0 ? 0 : -1; + + // return 0; +} int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = -1; int32_t len = strlen(defaultPath) + 32; @@ -396,7 +489,7 @@ int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* de taosRemoveDir(tmp); } taosMkDir(defaultPath); - code = copyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); if (code != 0) { stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 39de117339..74ce5e5aa6 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -18,8 +18,8 @@ #include "zlib.h" #ifdef WINDOWS -#include #include +#include #include #include #define F_OK 0 @@ -50,7 +50,7 @@ typedef struct TdFile { TdThreadRwlock rwlock; int refId; HANDLE hFile; - FILE* fp; + FILE *fp; int32_t tdFileOptions; } TdFile; #else @@ -230,7 +230,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a int32_t code = _stati64(path, &fileStat); #else struct stat fileStat; - int32_t code = stat(path, &fileStat); + int32_t code = stat(path, &fileStat); #endif if (code < 0) { return code; @@ -274,7 +274,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) { return -1; } struct stat fileStat; - int32_t code = fstat(pFile->fd, &fileStat); + int32_t code = fstat(pFile->fd, &fileStat); if (code < 0) { printf("taosFStatFile run fstat fail."); return code; @@ -374,7 +374,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { DWORD bytesRead; if (!ReadFile(pFile->hFile, buf, count, &bytesRead, NULL)) { bytesRead = -1; - } + } #if FILE_WITH_LOCK taosThreadRwlockUnlock(&(pFile->rwlock)); #endif @@ -389,7 +389,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { taosThreadRwlockWrlock(&(pFile->rwlock)); #endif - DWORD bytesWritten; + DWORD bytesWritten; if (!WriteFile(pFile->hFile, buf, count, &bytesWritten, NULL)) { bytesWritten = -1; } @@ -666,7 +666,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { } int64_t leftbytes = count; int64_t readbytes; - char * tbuf = (char *)buf; + char *tbuf = (char *)buf; while (leftbytes > 0) { #ifdef WINDOWS @@ -716,7 +716,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { int64_t nleft = count; int64_t nwritten = 0; - char * tbuf = (char *)buf; + char *tbuf = (char *)buf; while (nleft > 0) { nwritten = write(pFile->fd, (void *)tbuf, (uint32_t)nleft); @@ -1028,7 +1028,7 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in #endif } -#endif // WINDOWS +#endif // WINDOWS TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { FILE *fp = NULL; @@ -1056,7 +1056,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { if (hFile != NULL) CloseHandle(hFile); #else if (fd >= 0) close(fd); -#endif +#endif if (fp != NULL) fclose(fp); return NULL; } @@ -1067,7 +1067,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { pFile->fp = fp; pFile->refId = 0; - #ifdef WINDOWS +#ifdef WINDOWS pFile->hFile = hFile; pFile->tdFileOptions = tdFileOptions; // do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle @@ -1137,7 +1137,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset) #endif return -1; } - DWORD ret = 0; + DWORD ret = 0; OVERLAPPED ol = {0}; ol.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20); ol.Offset = (uint32_t)(offset & 0xFFFFFFFFLL); @@ -1179,7 +1179,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) { if (pFile->hFile != NULL) { if (pFile->tdFileOptions & TD_FILE_WRITE_THROUGH) { return 0; - } + } return !FlushFileBuffers(pFile->hFile); #else if (pFile->fd >= 0) { @@ -1204,7 +1204,7 @@ bool taosValidFile(TdFilePtr pFile) { return pFile != NULL && pFile->hFile != NULL; #else return pFile != NULL && pFile->fd > 0; -#endif +#endif } int32_t taosUmaskFile(int32_t maskVal) { @@ -1249,7 +1249,7 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { } bufferSize += 512; - void* newBuf = taosMemoryRealloc(*ptrBuf, bufferSize); + void *newBuf = taosMemoryRealloc(*ptrBuf, bufferSize); if (newBuf == NULL) { taosMemoryFreeClear(*ptrBuf); return -1; @@ -1363,7 +1363,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { cmp_end: if (pFile) { - taosCloseFile(&pFile); + taosCloseFile(&pFile); } if (pSrcFile) { taosCloseFile(&pSrcFile); @@ -1386,3 +1386,17 @@ int32_t taosSetFileHandlesLimit() { #endif return 0; } + +int32_t taosLinkFile(char *src, char *dst) { +#ifdef WINDOWS + // don nothing + return 0; +#endif + if (link(src, dst) != 0) { + if (errno == EXDEV || errno == ENOTSUP) { + return -1; + } + return errno; + } + return 0; +}