add self check

This commit is contained in:
Yihao Deng 2024-06-28 08:14:34 +00:00
parent 2ae54486b5
commit 2e59284388
1 changed files with 91 additions and 51 deletions

View File

@ -196,28 +196,54 @@ int32_t getCfIdx(const char* cfName) {
return idx;
}
bool isValidCheckpoint(const char* dir) { return true; }
bool isValidCheckpoint(const char* dir) {
// not implement yet
return true;
}
/*
*copy pChkpIdDir's file to state dir
*/
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later
int32_t code = 0;
int32_t cap = strlen(path) + 64;
int32_t nBytes = 0;
char* state = taosMemoryCalloc(1, cap);
if (state == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosMemoryFree(state);
return -1;
}
/*param@1: checkpointId dir
param@2: state
copy pChkpIdDir's file to state dir
opt to set hard link to previous file
*/
char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
if (chkpId != 0) {
char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
char* chkp = taosMemoryCalloc(1, cap);
if (chkp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(state);
return -1;
}
nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosMemoryFree(state);
taosMemoryFree(chkp);
return -1;
}
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
cleanDir(state, "");
code = backendCopyFiles(chkp, state);
stInfo("copy snap file from %s to %s", chkp, state);
if (code != 0) {
stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(terrno)));
} else {
stInfo("start to restart stream backend at checkpoint path: %s", chkp);
}
@ -225,7 +251,10 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
} else {
stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
taosMkDir(state);
code = taosMkDir(state);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
}
}
taosMemoryFree(chkp);
@ -247,7 +276,9 @@ typedef struct {
} SSChkpMetaOnS3;
int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
int32_t code = -1;
int32_t cap = strlen(path) + 32;
TdFilePtr pFile = NULL;
char* metaPath = taosMemoryCalloc(1, cap);
if (metaPath == NULL) {
@ -256,41 +287,42 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
}
int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META");
if (n <= 0 || n >= (cap - 1)) {
if (n <= 0 || n >= cap) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(metaPath);
return -1;
}
TdFilePtr pFile = taosOpenFile(path, TD_FILE_READ);
pFile = taosOpenFile(path, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(metaPath);
return -1;
goto _EXIT;
}
char buf[256] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(metaPath);
taosCloseFile(&pFile);
return -1;
goto _EXIT;
}
SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3));
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId,
p->processName, &p->processId);
if (n != 6) {
terrno = TSDB_CODE_INVALID_MSG;
taosMemoryFree(p);
taosMemoryFree(metaPath);
taosCloseFile(&pFile);
return -1;
goto _EXIT;
}
*pMeta = p;
code = 0;
_EXIT:
taosCloseFile(&pFile);
taosMemoryFree(metaPath);
return 0;
return code;
}
int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) {
int8_t valid = 0;
@ -321,7 +353,6 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t ch
return -1;
}
int8_t count = 0;
// for (int i = 0; i < taosArrayGetSize(list); i++) {
// char* p = taosArrayGetP(list, i);
// sprintf(src, "%s%s%s", path, TD_DIRSEP, p);
@ -419,7 +450,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp);
// SArray* list = taosArrayInit(2, sizeof(void*));
SSChkpMetaOnS3* pMeta;
SSChkpMetaOnS3* pMeta = NULL;
code = remoteChkp_readMetaData(chkpPath, &pMeta);
if (code == 0) code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
@ -481,76 +512,84 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
const char* info = "info";
size_t infoLen = strlen(info);
int32_t code = 0;
int32_t code = -1;
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
int32_t cap = TMAX(sLen, dLen) + 64;
int32_t nBytes = 0;
char* srcName = taosMemoryCalloc(1, cap);
char* dstName = taosMemoryCalloc(1, cap);
if (srcName == NULL || dstName == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// copy file to dst
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) {
taosMemoryFree(srcName);
taosMemoryFree(dstName);
code = TAOS_SYSTEM_ERROR(errno);
errno = 0;
return code;
terrno = TAOS_SYSTEM_ERROR(errno);
}
errno = 0;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
continue;
}
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
code = copyFiles_create(srcName, dstName, 0);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(code);
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(terrno));
goto _ERROR;
}
} else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) {
code = copyFiles_create(srcName, dstName, 0);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(code);
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(terrno));
goto _ERROR;
}
} else {
code = copyFiles_hardlink(srcName, dstName, 0);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(code);
stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(terrno));
goto _ERROR;
} else {
stDebug("succ hard link file:%s to %s", srcName, dstName);
}
}
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
memset(srcName, 0, cap);
memset(dstName, 0, cap);
}
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
errno = 0;
return code;
_ERROR:
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
errno = 0;
return code;
}
@ -568,7 +607,8 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch
if (code != TSDB_CODE_SUCCESS) {
cleanDir(defaultPath, pTaskIdStr);
stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(terrno)));
terrno = 0;
code = TSDB_CODE_SUCCESS;
} else {
stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,