Merge remote-tracking branch 'origin/fix/3_liaohj' into fix/3_liaohj

This commit is contained in:
Haojun Liao 2024-01-15 16:16:43 +08:00
commit bc657dc2d8
3 changed files with 126 additions and 17 deletions

View File

@ -117,6 +117,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName);
int32_t taosSetFileHandlesLimit(); int32_t taosSetFileHandlesLimit();
int32_t taosLinkFile(char *src, char *dst);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -382,6 +382,99 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d
return -1; return -1;
} }
int32_t copyFiles_create(char* src, char* dst, int8_t type) {
// create and copy file
return taosCopyFile(src, dst);
}
int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
// same fs and hard link
return taosLinkFile(src, dst);
}
int32_t backendFileCopyFilesImpl(char* src, char* dst) {
int32_t code = 0;
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
// copy file to dst
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) {
taosMemoryFree(srcName);
taosMemoryFree(dstName);
return -1;
}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
if (strcmp(name, "CURRENT") == 0) {
code = copyFiles_create(srcName, dstName, 0);
} else {
code = copyFiles_hardlink(srcName, dstName, 0);
}
if (code != 0) {
goto _ERROR;
}
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
}
taosCloseDir(&pDir);
return 0;
_ERROR:
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
return -1;
}
int32_t backendCopyFiles(char* src, char* dst) {
return backendFileCopyFilesImpl(src, dst);
// // opt later, just hard link
// int32_t sLen = strlen(src);
// int32_t dLen = strlen(dst);
// char* srcName = taosMemoryCalloc(1, sLen + 64);
// char* dstName = taosMemoryCalloc(1, dLen + 64);
// TdDirPtr pDir = taosOpenDir(src);
// if (pDir == NULL) {
// taosMemoryFree(srcName);
// taosMemoryFree(dstName);
// return -1;
// }
// TdDirEntryPtr de = NULL;
// while ((de = taosReadDir(pDir)) != NULL) {
// char* name = taosGetDirEntryName(de);
// if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
// sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
// sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
// // if (!taosDirEntryIsDir(de)) {
// // // code = taosCopyFile(srcName, dstName);
// // if (code == -1) {
// // goto _err;
// // }
// // }
// return backendFileCopyFilesImpl(src, dst);
// memset(srcName, 0, sLen + 64);
// memset(dstName, 0, dLen + 64);
// }
// _err:
// taosMemoryFreeClear(srcName);
// taosMemoryFreeClear(dstName);
// taosCloseDir(&pDir);
// return code >= 0 ? 0 : -1;
// return 0;
}
int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = -1; int32_t code = -1;
int32_t len = strlen(defaultPath) + 32; int32_t len = strlen(defaultPath) + 32;
@ -396,7 +489,7 @@ int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* de
taosRemoveDir(tmp); taosRemoveDir(tmp);
} }
taosMkDir(defaultPath); taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath); code = backendCopyFiles(chkpPath, defaultPath);
if (code != 0) { if (code != 0) {
stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else { } else {

View File

@ -18,8 +18,8 @@
#include "zlib.h" #include "zlib.h"
#ifdef WINDOWS #ifdef WINDOWS
#include <io.h>
#include <WinBase.h> #include <WinBase.h>
#include <io.h>
#include <ktmw32.h> #include <ktmw32.h>
#include <windows.h> #include <windows.h>
#define F_OK 0 #define F_OK 0
@ -50,7 +50,7 @@ typedef struct TdFile {
TdThreadRwlock rwlock; TdThreadRwlock rwlock;
int refId; int refId;
HANDLE hFile; HANDLE hFile;
FILE* fp; FILE *fp;
int32_t tdFileOptions; int32_t tdFileOptions;
} TdFile; } TdFile;
#else #else
@ -230,7 +230,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a
int32_t code = _stati64(path, &fileStat); int32_t code = _stati64(path, &fileStat);
#else #else
struct stat fileStat; struct stat fileStat;
int32_t code = stat(path, &fileStat); int32_t code = stat(path, &fileStat);
#endif #endif
if (code < 0) { if (code < 0) {
return code; return code;
@ -274,7 +274,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
return -1; return -1;
} }
struct stat fileStat; struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat); int32_t code = fstat(pFile->fd, &fileStat);
if (code < 0) { if (code < 0) {
printf("taosFStatFile run fstat fail."); printf("taosFStatFile run fstat fail.");
return code; return code;
@ -374,7 +374,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
DWORD bytesRead; DWORD bytesRead;
if (!ReadFile(pFile->hFile, buf, count, &bytesRead, NULL)) { if (!ReadFile(pFile->hFile, buf, count, &bytesRead, NULL)) {
bytesRead = -1; bytesRead = -1;
} }
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockUnlock(&(pFile->rwlock)); taosThreadRwlockUnlock(&(pFile->rwlock));
#endif #endif
@ -389,7 +389,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
taosThreadRwlockWrlock(&(pFile->rwlock)); taosThreadRwlockWrlock(&(pFile->rwlock));
#endif #endif
DWORD bytesWritten; DWORD bytesWritten;
if (!WriteFile(pFile->hFile, buf, count, &bytesWritten, NULL)) { if (!WriteFile(pFile->hFile, buf, count, &bytesWritten, NULL)) {
bytesWritten = -1; bytesWritten = -1;
} }
@ -666,7 +666,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
} }
int64_t leftbytes = count; int64_t leftbytes = count;
int64_t readbytes; int64_t readbytes;
char * tbuf = (char *)buf; char *tbuf = (char *)buf;
while (leftbytes > 0) { while (leftbytes > 0) {
#ifdef WINDOWS #ifdef WINDOWS
@ -716,7 +716,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
int64_t nleft = count; int64_t nleft = count;
int64_t nwritten = 0; int64_t nwritten = 0;
char * tbuf = (char *)buf; char *tbuf = (char *)buf;
while (nleft > 0) { while (nleft > 0) {
nwritten = write(pFile->fd, (void *)tbuf, (uint32_t)nleft); nwritten = write(pFile->fd, (void *)tbuf, (uint32_t)nleft);
@ -1028,7 +1028,7 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
#endif #endif
} }
#endif // WINDOWS #endif // WINDOWS
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
FILE *fp = NULL; FILE *fp = NULL;
@ -1056,7 +1056,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
if (hFile != NULL) CloseHandle(hFile); if (hFile != NULL) CloseHandle(hFile);
#else #else
if (fd >= 0) close(fd); if (fd >= 0) close(fd);
#endif #endif
if (fp != NULL) fclose(fp); if (fp != NULL) fclose(fp);
return NULL; return NULL;
} }
@ -1067,7 +1067,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
pFile->fp = fp; pFile->fp = fp;
pFile->refId = 0; pFile->refId = 0;
#ifdef WINDOWS #ifdef WINDOWS
pFile->hFile = hFile; pFile->hFile = hFile;
pFile->tdFileOptions = tdFileOptions; pFile->tdFileOptions = tdFileOptions;
// do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle // do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle
@ -1137,7 +1137,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
#endif #endif
return -1; return -1;
} }
DWORD ret = 0; DWORD ret = 0;
OVERLAPPED ol = {0}; OVERLAPPED ol = {0};
ol.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20); ol.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20);
ol.Offset = (uint32_t)(offset & 0xFFFFFFFFLL); ol.Offset = (uint32_t)(offset & 0xFFFFFFFFLL);
@ -1179,7 +1179,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
if (pFile->hFile != NULL) { if (pFile->hFile != NULL) {
if (pFile->tdFileOptions & TD_FILE_WRITE_THROUGH) { if (pFile->tdFileOptions & TD_FILE_WRITE_THROUGH) {
return 0; return 0;
} }
return !FlushFileBuffers(pFile->hFile); return !FlushFileBuffers(pFile->hFile);
#else #else
if (pFile->fd >= 0) { if (pFile->fd >= 0) {
@ -1204,7 +1204,7 @@ bool taosValidFile(TdFilePtr pFile) {
return pFile != NULL && pFile->hFile != NULL; return pFile != NULL && pFile->hFile != NULL;
#else #else
return pFile != NULL && pFile->fd > 0; return pFile != NULL && pFile->fd > 0;
#endif #endif
} }
int32_t taosUmaskFile(int32_t maskVal) { int32_t taosUmaskFile(int32_t maskVal) {
@ -1249,7 +1249,7 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
} }
bufferSize += 512; bufferSize += 512;
void* newBuf = taosMemoryRealloc(*ptrBuf, bufferSize); void *newBuf = taosMemoryRealloc(*ptrBuf, bufferSize);
if (newBuf == NULL) { if (newBuf == NULL) {
taosMemoryFreeClear(*ptrBuf); taosMemoryFreeClear(*ptrBuf);
return -1; return -1;
@ -1363,7 +1363,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
cmp_end: cmp_end:
if (pFile) { if (pFile) {
taosCloseFile(&pFile); taosCloseFile(&pFile);
} }
if (pSrcFile) { if (pSrcFile) {
taosCloseFile(&pSrcFile); taosCloseFile(&pSrcFile);
@ -1386,3 +1386,17 @@ int32_t taosSetFileHandlesLimit() {
#endif #endif
return 0; return 0;
} }
int32_t taosLinkFile(char *src, char *dst) {
#ifdef WINDOWS
// don nothing
return 0;
#endif
if (link(src, dst) != 0) {
if (errno == EXDEV || errno == ENOTSUP) {
return -1;
}
return errno;
}
return 0;
}