add self check

This commit is contained in:
Yihao Deng 2024-07-02 08:23:56 +00:00
parent 6c6bff611a
commit ca1562a990
3 changed files with 162 additions and 107 deletions

View File

@ -131,6 +131,8 @@ typedef struct {
TdThreadRwlock rwLock;
} SBkdMgt;
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId);
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
void streamBackendCleanup(void* arg);
@ -258,6 +260,7 @@ void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
const char* id);
int32_t remoteChkpGetDelFile(char* path, SArray* toDel);
void* taskAcquireDb(int64_t refId);
void taskReleaseDb(int64_t refId);

View File

@ -19,8 +19,6 @@
#include "tcommon.h"
#include "tref.h"
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
@ -152,6 +150,9 @@ static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const cha
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId);
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId);
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t copyFiles(const char* src, const char* dst);
uint32_t nextPow2(uint32_t x);
@ -286,7 +287,7 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
return -1;
}
int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META");
int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META");
if (n <= 0 || n >= cap) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(metaPath);
@ -317,6 +318,12 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
taosMemoryFree(p);
goto _EXIT;
}
if (p->currChkptId != p->manifestChkptId) {
terrno = TSDB_CODE_INVALID_MSG;
taosMemoryFree(p);
goto _EXIT;
}
*pMeta = p;
code = 0;
_EXIT:
@ -324,66 +331,100 @@ _EXIT:
taosMemoryFree(metaPath);
return code;
}
int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) {
int8_t valid = 0;
for (int i = 0; i < strlen(name); i++) {
if (name[i] == '_') {
memcpy(prename, name, i);
if (taosStr2int64(name + i + 1) != chkpId) {
break;
} else {
valid = 1;
}
}
}
return valid;
}
int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) {
int32_t complete = 1;
int32_t len = strlen(path) + 32;
char* src = taosMemoryCalloc(1, len);
char* dst = taosMemoryCalloc(1, len);
int32_t code = -1;
int32_t nBytes = 0;
int32_t cap = strlen(path) + 64;
char* src = taosMemoryCalloc(1, cap);
char* dst = taosMemoryCalloc(1, cap);
if (src == NULL || dst == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
return code;
}
if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
terrno = TSDB_CODE_INVALID_CFG;
return code;
}
// rename current_chkp/mainfest to current
for (int i = 0; i < 2; i++) {
char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
if (strlen(key) <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
}
// for (int i = 0; i < taosArrayGetSize(list); i++) {
// char* p = taosArrayGetP(list, i);
// sprintf(src, "%s%s%s", path, TD_DIRSEP, p);
nBytes = snprintf(src, cap, "%s%s%s_%" PRId64 "", path, TD_DIRSEP, key, pMeta->currChkptId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _EXIT;
}
// // check file exist
// if (taosStatFile(src, NULL, NULL, NULL) != 0) {
// complete = 0;
// break;
// }
if (taosStatFile(src, NULL, NULL, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
}
// // check file name
// char temp[64] = {0};
// if (remoteChkp_validMetaFile(p, temp, chkpId)) {
// count++;
// }
nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _EXIT;
}
// // rename file
// sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp);
// taosRenameFile(src, dst);
if (taosRenameFile(src, dst) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
}
// memset(src, 0, len);
// memset(dst, 0, len);
// }
// if (count != taosArrayGetSize(list)) {
// complete = 0;
// }
memset(src, 0, cap);
memset(dst, 0, cap);
}
code = 0;
// rename manifest_chkp to manifest
_EXIT:
taosMemoryFree(src);
taosMemoryFree(dst);
return code;
}
int32_t remoteChkpGetDelFile(char* path, SArray* toDel) {
int32_t code = -1;
int32_t nBytes = 0;
return complete == 1 ? 0 : -1;
SSChkpMetaOnS3* pMeta = NULL;
code = remoteChkp_readMetaData(path, &pMeta);
if (code != 0) {
return code;
}
for (int i = 0; i < 2; i++) {
char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName);
int32_t cap = strlen(key) + 32;
char* p = taosMemoryCalloc(1, cap);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pMeta);
return -1;
}
nBytes = snprintf(p, cap, "%s_%" PRId64 "", key, pMeta->currChkptId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosMemoryFree(pMeta);
taosMemoryFree(p);
return code;
}
if (taosArrayPush(toDel, &p) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pMeta);
taosMemoryFree(p);
return code;
}
}
code = 0;
return code;
}
void cleanDir(const char* pPath, const char* id) {
@ -424,56 +465,91 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64
return backendCopyFiles(checkpointPath, defaultPath);
}
int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
SSChkpMetaOnS3* pMeta = NULL;
int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta);
if (code != 0) {
return -1;
}
if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
taosMemoryFree(pMeta);
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
if (code != 0) {
taosMemoryFree(pMeta);
return -1;
}
return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId);
}
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int8_t rename = 0;
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
if (code != 0) {
return code;
}
int32_t nBytes;
int32_t cap = strlen(defaultPath) + 32;
char* tmp = taosMemoryCalloc(1, cap);
if (tmp == NULL) {
char* defaultTmp = taosMemoryCalloc(1, cap);
if (defaultTmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
nBytes = snprintf(tmp, cap, "%s%s", defaultPath, "_tmp");
int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp");
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosMemoryFree(tmp);
taosMemoryFree(defaultPath);
return -1;
}
if (taosIsDir(tmp)) taosRemoveDir(tmp);
if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp);
// SArray* list = taosArrayInit(2, sizeof(void*));
SSChkpMetaOnS3* pMeta = NULL;
code = remoteChkp_readMetaData(chkpPath, &pMeta);
if (code == 0) code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
taosMemoryFree(pMeta);
if (code == 0) {
code = taosMkDir(defaultPath);
}
if (code == 0) {
code = backendCopyFiles(chkpPath, defaultPath);
}
if (code != 0) {
if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath);
if (taosIsDir(tmp)) {
code = taosRenameFile(tmp, defaultPath);
if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp);
if (taosIsDir(defaultPath)) {
code = taosRenameFile(defaultPath, defaultTmp);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
} else {
rename = 1;
}
} else {
taosRemoveDir(tmp);
code = taosMkDir(defaultPath);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _EXIT;
}
}
taosMemoryFree(tmp);
code = rebuildDataFromS3(chkpPath, chkpId);
if (code != 0) {
goto _EXIT;
}
code = backendCopyFiles(chkpPath, defaultPath);
if (code != 0) {
goto _EXIT;
}
code = 0;
_EXIT:
if (code != 0) {
if (rename) {
taosRenameFile(defaultTmp, defaultPath);
}
}
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
taosMemoryFree(defaultTmp);
return code;
}

View File

@ -542,7 +542,7 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
TdFilePtr pFile = NULL;
int32_t cap = strlen(path) + 32;
int32_t cap = strlen(path) + 64;
char buf[128] = {0};
int32_t code = 0;
@ -553,7 +553,7 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
}
int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
if (nBytes != strlen(filePath)) {
if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(filePath);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
@ -561,41 +561,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
code = downloadCheckpointDataByName(id, "META", filePath);
if (code != 0) {
stDebug("%s chkp failed to download meta file:%s", id, filePath);
stError("%s chkp failed to download meta file:%s", id, filePath);
taosMemoryFree(filePath);
return code;
}
pFile = taosOpenFile(filePath, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to open meta file:%s for checkpoint", id, filePath);
code = remoteChkpGetDelFile(filePath, list);
if (code != 0) {
stError("%s chkp failed to get to del:%s", id, filePath);
taosMemoryFree(filePath);
return -1;
}
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
stError("%s failed to read meta file:%s for checkpoint", id, filePath);
code = -1;
} else {
int32_t len = strnlen(buf, tListLen(buf));
for (int i = 0; i < len; i++) {
if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1);
memcpy(item, buf, i);
taosArrayPush(list, &item);
item = taosMemoryCalloc(1, len - i);
memcpy(item, buf + i + 1, len - i - 1);
taosArrayPush(list, &item);
}
}
}
taosCloseFile(&pFile);
taosRemoveFile(filePath);
taosMemoryFree(filePath);
return code;
return 0;
}
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {