From ca1562a990059a891e1b893778a05e9c1363d485 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 2 Jul 2024 08:23:56 +0000 Subject: [PATCH] add self check --- source/libs/stream/inc/streamBackendRocksdb.h | 3 + source/libs/stream/src/streamBackendRocksdb.c | 228 ++++++++++++------ source/libs/stream/src/streamCheckpoint.c | 38 +-- 3 files changed, 162 insertions(+), 107 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 24cd861550..e4c5787020 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -131,6 +131,8 @@ typedef struct { TdThreadRwlock rwLock; } SBkdMgt; +#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" + bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId); void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); @@ -258,6 +260,7 @@ void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* id); +int32_t remoteChkpGetDelFile(char* path, SArray* toDel); void* taskAcquireDb(int64_t refId); void taskReleaseDb(int64_t refId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1042e6dfc9..0074251669 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -19,8 +19,6 @@ #include "tcommon.h" #include "tref.h" -#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" - typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -152,6 +150,9 @@ static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const cha void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); +int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId); +int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId); + #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); int32_t copyFiles(const char* src, const char* dst); uint32_t nextPow2(uint32_t x); @@ -286,7 +287,7 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { return -1; } - int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); + int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META"); if (n <= 0 || n >= cap) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(metaPath); @@ -317,6 +318,12 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { taosMemoryFree(p); goto _EXIT; } + + if (p->currChkptId != p->manifestChkptId) { + terrno = TSDB_CODE_INVALID_MSG; + taosMemoryFree(p); + goto _EXIT; + } *pMeta = p; code = 0; _EXIT: @@ -324,66 +331,100 @@ _EXIT: taosMemoryFree(metaPath); return code; } -int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) { - int8_t valid = 0; - for (int i = 0; i < strlen(name); i++) { - if (name[i] == '_') { - memcpy(prename, name, i); - if (taosStr2int64(name + i + 1) != chkpId) { - break; - } else { - valid = 1; - } - } - } - return valid; -} + int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) { - int32_t complete = 1; - int32_t len = strlen(path) + 32; - char* src = taosMemoryCalloc(1, len); - char* dst = taosMemoryCalloc(1, len); + int32_t code = -1; + int32_t nBytes = 0; + int32_t cap = strlen(path) + 64; + char* src = taosMemoryCalloc(1, cap); + char* dst = taosMemoryCalloc(1, cap); if (src == NULL || dst == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return code; } if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + terrno = TSDB_CODE_INVALID_CFG; + return code; } + // rename current_chkp/mainfest to current + for (int i = 0; i < 2; i++) { + char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName); + if (strlen(key) <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + } - // for (int i = 0; i < taosArrayGetSize(list); i++) { - // char* p = taosArrayGetP(list, i); - // sprintf(src, "%s%s%s", path, TD_DIRSEP, p); + nBytes = snprintf(src, cap, "%s%s%s_%" PRId64 "", path, TD_DIRSEP, key, pMeta->currChkptId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } - // // check file exist - // if (taosStatFile(src, NULL, NULL, NULL) != 0) { - // complete = 0; - // break; - // } + if (taosStatFile(src, NULL, NULL, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } - // // check file name - // char temp[64] = {0}; - // if (remoteChkp_validMetaFile(p, temp, chkpId)) { - // count++; - // } + nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } - // // rename file - // sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); - // taosRenameFile(src, dst); + if (taosRenameFile(src, dst) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } - // memset(src, 0, len); - // memset(dst, 0, len); - // } - // if (count != taosArrayGetSize(list)) { - // complete = 0; - // } + memset(src, 0, cap); + memset(dst, 0, cap); + } + code = 0; +// rename manifest_chkp to manifest +_EXIT: taosMemoryFree(src); taosMemoryFree(dst); + return code; +} +int32_t remoteChkpGetDelFile(char* path, SArray* toDel) { + int32_t code = -1; + int32_t nBytes = 0; - return complete == 1 ? 0 : -1; + SSChkpMetaOnS3* pMeta = NULL; + code = remoteChkp_readMetaData(path, &pMeta); + if (code != 0) { + return code; + } + + for (int i = 0; i < 2; i++) { + char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName); + + int32_t cap = strlen(key) + 32; + char* p = taosMemoryCalloc(1, cap); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pMeta); + return -1; + } + + nBytes = snprintf(p, cap, "%s_%" PRId64 "", key, pMeta->currChkptId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + taosMemoryFree(pMeta); + taosMemoryFree(p); + return code; + } + if (taosArrayPush(toDel, &p) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pMeta); + taosMemoryFree(p); + return code; + } + } + code = 0; + + return code; } void cleanDir(const char* pPath, const char* id) { @@ -424,56 +465,91 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64 return backendCopyFiles(checkpointPath, defaultPath); } +int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) { + SSChkpMetaOnS3* pMeta = NULL; + + int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta); + if (code != 0) { + return -1; + } + + if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { + taosMemoryFree(pMeta); + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + + code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); + if (code != 0) { + taosMemoryFree(pMeta); + return -1; + } + + return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId); +} + int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + int8_t rename = 0; int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } - int32_t nBytes; int32_t cap = strlen(defaultPath) + 32; - char* tmp = taosMemoryCalloc(1, cap); - if (tmp == NULL) { + char* defaultTmp = taosMemoryCalloc(1, cap); + if (defaultTmp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - nBytes = snprintf(tmp, cap, "%s%s", defaultPath, "_tmp"); + int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp"); if (nBytes <= 0 || nBytes >= cap) { terrno = TSDB_CODE_OUT_OF_RANGE; - taosMemoryFree(tmp); + taosMemoryFree(defaultPath); return -1; } - if (taosIsDir(tmp)) taosRemoveDir(tmp); - if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); - - // SArray* list = taosArrayInit(2, sizeof(void*)); - SSChkpMetaOnS3* pMeta = NULL; - code = remoteChkp_readMetaData(chkpPath, &pMeta); - if (code == 0) code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); - - taosMemoryFree(pMeta); - - if (code == 0) { - code = taosMkDir(defaultPath); - } - - if (code == 0) { - code = backendCopyFiles(chkpPath, defaultPath); - } - - if (code != 0) { - if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath); - if (taosIsDir(tmp)) { - code = taosRenameFile(tmp, defaultPath); + if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp); + if (taosIsDir(defaultPath)) { + code = taosRenameFile(defaultPath, defaultTmp); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } else { + rename = 1; } } else { - taosRemoveDir(tmp); + code = taosMkDir(defaultPath); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } } - taosMemoryFree(tmp); + code = rebuildDataFromS3(chkpPath, chkpId); + if (code != 0) { + goto _EXIT; + } + + code = backendCopyFiles(chkpPath, defaultPath); + if (code != 0) { + goto _EXIT; + } + code = 0; + +_EXIT: + if (code != 0) { + if (rename) { + taosRenameFile(defaultTmp, defaultPath); + } + } + + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); + } + + taosMemoryFree(defaultTmp); return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index bc5067d4d6..8b75e74d3b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -542,7 +542,7 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { TdFilePtr pFile = NULL; - int32_t cap = strlen(path) + 32; + int32_t cap = strlen(path) + 64; char buf[128] = {0}; int32_t code = 0; @@ -553,7 +553,7 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP"); - if (nBytes != strlen(filePath)) { + if (nBytes <= 0 || nBytes >= cap) { taosMemoryFree(filePath); terrno = TSDB_CODE_OUT_OF_RANGE; return -1; @@ -561,41 +561,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l code = downloadCheckpointDataByName(id, "META", filePath); if (code != 0) { - stDebug("%s chkp failed to download meta file:%s", id, filePath); + stError("%s chkp failed to download meta file:%s", id, filePath); taosMemoryFree(filePath); return code; } - pFile = taosOpenFile(filePath, TD_FILE_READ); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("%s failed to open meta file:%s for checkpoint", id, filePath); + code = remoteChkpGetDelFile(filePath, list); + if (code != 0) { + stError("%s chkp failed to get to del:%s", id, filePath); taosMemoryFree(filePath); - return -1; } - - if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - stError("%s failed to read meta file:%s for checkpoint", id, filePath); - code = -1; - } else { - int32_t len = strnlen(buf, tListLen(buf)); - for (int i = 0; i < len; i++) { - if (buf[i] == '\n') { - char* item = taosMemoryCalloc(1, i + 1); - memcpy(item, buf, i); - taosArrayPush(list, &item); - - item = taosMemoryCalloc(1, len - i); - memcpy(item, buf + i + 1, len - i - 1); - taosArrayPush(list, &item); - } - } - } - - taosCloseFile(&pFile); - taosRemoveFile(filePath); - taosMemoryFree(filePath); - return code; + return 0; } int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {