add self check

This commit is contained in:
Yihao Deng 2024-06-28 07:01:45 +00:00
parent 8fe57c1669
commit 2ae54486b5
1 changed files with 86 additions and 53 deletions

View File

@ -373,24 +373,24 @@ int32_t createDirIfNotExist(const char* pPath) {
}
}
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
int32_t code = 0;
if (taosIsDir(chkptPath)) {
taosRemoveDir(chkptPath);
stDebug("remove local checkpoint data dir:%s succ", chkptPath);
if (taosIsDir(checkpointPath)) {
taosRemoveDir(checkpointPath);
stDebug("remove local checkpoint data dir:%s succ", checkpointPath);
}
cleanDir(defaultPath, key);
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
code = streamTaskDownloadCheckpointData(key, chkptPath);
code = streamTaskDownloadCheckpointData(key, checkpointPath);
if (code != 0) {
stError("failed to download checkpoint data:%s", key);
return code;
}
stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key);
return backendCopyFiles(chkptPath, defaultPath);
return backendCopyFiles(checkpointPath, defaultPath);
}
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
@ -399,29 +399,45 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
return code;
}
int32_t len = strlen(defaultPath) + 32;
char* tmp = taosMemoryCalloc(1, len);
sprintf(tmp, "%s%s", defaultPath, "_tmp");
int32_t nBytes;
int32_t cap = strlen(defaultPath) + 32;
char* tmp = taosMemoryCalloc(1, cap);
if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
nBytes = snprintf(tmp, cap, "%s%s", defaultPath, "_tmp");
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosMemoryFree(tmp);
return -1;
}
if (taosIsDir(tmp)) taosRemoveDir(tmp);
if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp);
// SArray* list = taosArrayInit(2, sizeof(void*));
SSChkpMetaOnS3* pMeta;
code = remoteChkp_readMetaData(chkpPath, &pMeta);
if (code == 0) {
code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
}
if (code == 0) code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
taosMemoryFree(pMeta);
// taosArrayDestroyP(list, taosMemoryFree);
if (code == 0) {
taosMkDir(defaultPath);
code = taosMkDir(defaultPath);
}
if (code == 0) {
code = backendCopyFiles(chkpPath, defaultPath);
}
if (code != 0) {
if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath);
if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath);
if (taosIsDir(tmp)) {
code = taosRenameFile(tmp, defaultPath);
}
} else {
taosRemoveDir(tmp);
}
@ -430,12 +446,12 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
return code;
}
int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkptPath, checkpointId, defaultPath);
return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath);
} else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkptPath, checkpointId, defaultPath);
return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath);
} else {
stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
}
@ -570,69 +586,78 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
int64_t* processVer) {
int32_t code = -1;
size_t pathLen = strlen(path);
char* prefixPath = NULL;
char* defaultPath = NULL;
char* checkpointPath = NULL;
char* checkpointRoot = NULL;
int32_t cap = strlen(path) + 128;
int32_t nBytes;
// alloc buf
prefixPath = taosMemoryCalloc(1, pathLen + 64);
if (prefixPath == NULL) {
prefixPath = taosMemoryCalloc(1, cap);
defaultPath = taosMemoryCalloc(1, cap);
checkpointPath = taosMemoryCalloc(1, cap);
checkpointRoot = taosMemoryCalloc(1, cap);
if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key);
nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _EXIT;
}
code = createDirIfNotExist(prefixPath);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
}
defaultPath = taosMemoryCalloc(1, pathLen + 128);
if (defaultPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state");
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _EXIT;
}
sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state");
code = createDirIfNotExist(defaultPath);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
}
char* checkpointRoot = taosMemoryCalloc(1, pathLen + 48);
if (checkpointRoot == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _EXIT;
}
sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
code = createDirIfNotExist(checkpointRoot);
if (code != 0) {
taosMemoryFreeClear(checkpointRoot);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
}
taosMemoryFreeClear(checkpointRoot);
stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
char* chkptPath = taosMemoryCalloc(1, pathLen + 128);
if (chkptPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
if (chkptId > 0) {
nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", chkptId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _EXIT;
}
if (chkptId > 0) {
snprintf(chkptPath, pathLen + 127, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath, processVer);
code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer);
if (code != 0) {
code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath);
terrno = 0;
code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath);
}
if (code != 0) {
stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath,
tstrerror(code), defaultPath);
stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s, reason:%s",
checkpointPath, tstrerror(code), defaultPath, tstrerror(terrno));
code = 0; // reset the error code
}
} else { // no valid checkpoint id
@ -641,15 +666,18 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
cleanDir(defaultPath, key);
}
taosMemoryFree(chkptPath);
*dbPath = defaultPath;
*dbPrefixPath = prefixPath;
return 0;
defaultPath = NULL;
prefixPath = NULL;
code = 0;
_EXIT:
taosMemoryFree(defaultPath);
taosMemoryFree(prefixPath);
taosMemoryFree(checkpointPath);
taosMemoryFree(checkpointRoot);
return code;
}
@ -4334,7 +4362,14 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
int32_t sstLen = strlen(pSST);
memset(p->buf, 0, p->len);
sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
nBytes =
snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
if (nBytes <= 0 || nBytes >= p->len) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosThreadRwlockUnlock(&p->rwLock);
return -1;
}
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
@ -4518,10 +4553,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
char* srcBuf = taosMemoryCalloc(1, cap);
char* dstBuf = taosMemoryCalloc(1, cap);
char* srcDir = taosMemoryCalloc(1, cap);
char* dstDir = taosMemoryCalloc(1, cap);
if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR;