From 2e59284388e264655dbb27c0db302b512c8a3e8d Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Fri, 28 Jun 2024 08:14:34 +0000 Subject: [PATCH] add self check --- source/libs/stream/src/streamBackendRocksdb.c | 142 +++++++++++------- 1 file changed, 91 insertions(+), 51 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cbe6dcc886..eff0481d5b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -196,28 +196,54 @@ int32_t getCfIdx(const char* cfName) { return idx; } -bool isValidCheckpoint(const char* dir) { return true; } +bool isValidCheckpoint(const char* dir) { + // not implement yet + return true; +} +/* + *copy pChkpIdDir's file to state dir + */ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; + int32_t cap = strlen(path) + 64; + int32_t nBytes = 0; + + char* state = taosMemoryCalloc(1, cap); + if (state == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state"); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + taosMemoryFree(state); + return -1; + } - /*param@1: checkpointId dir - param@2: state - copy pChkpIdDir's file to state dir - opt to set hard link to previous file - */ - char* state = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); if (chkpId != 0) { - char* chkp = taosMemoryCalloc(1, strlen(path) + 64); - sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + char* chkp = taosMemoryCalloc(1, cap); + if (chkp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(state); + return -1; + } + + nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + taosMemoryFree(state); + taosMemoryFree(chkp); + return -1; + } + if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { cleanDir(state, ""); code = backendCopyFiles(chkp, state); - stInfo("copy snap file from %s to %s", chkp, state); if (code != 0) { - stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(terrno))); } else { stInfo("start to restart stream backend at checkpoint path: %s", chkp); } @@ -225,7 +251,10 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { } else { stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)), state); - taosMkDir(state); + code = taosMkDir(state); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + } } taosMemoryFree(chkp); @@ -247,7 +276,9 @@ typedef struct { } SSChkpMetaOnS3; int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { - int32_t cap = strlen(path) + 32; + int32_t code = -1; + int32_t cap = strlen(path) + 32; + TdFilePtr pFile = NULL; char* metaPath = taosMemoryCalloc(1, cap); if (metaPath == NULL) { @@ -256,41 +287,42 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { } int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); - if (n <= 0 || n >= (cap - 1)) { + if (n <= 0 || n >= cap) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(metaPath); return -1; } - TdFilePtr pFile = taosOpenFile(path, TD_FILE_READ); + pFile = taosOpenFile(path, TD_FILE_READ); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(metaPath); - return -1; + goto _EXIT; } char buf[256] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { terrno = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(metaPath); - taosCloseFile(&pFile); - return -1; + goto _EXIT; } SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3)); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId, p->processName, &p->processId); if (n != 6) { terrno = TSDB_CODE_INVALID_MSG; taosMemoryFree(p); - taosMemoryFree(metaPath); - taosCloseFile(&pFile); - return -1; + goto _EXIT; } - + *pMeta = p; + code = 0; +_EXIT: taosCloseFile(&pFile); taosMemoryFree(metaPath); - return 0; + return code; } int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) { int8_t valid = 0; @@ -321,7 +353,6 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t ch return -1; } - int8_t count = 0; // for (int i = 0; i < taosArrayGetSize(list); i++) { // char* p = taosArrayGetP(list, i); // sprintf(src, "%s%s%s", path, TD_DIRSEP, p); @@ -419,7 +450,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); // SArray* list = taosArrayInit(2, sizeof(void*)); - SSChkpMetaOnS3* pMeta; + SSChkpMetaOnS3* pMeta = NULL; code = remoteChkp_readMetaData(chkpPath, &pMeta); if (code == 0) code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); @@ -481,76 +512,84 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { const char* info = "info"; size_t infoLen = strlen(info); - int32_t code = 0; + int32_t code = -1; int32_t sLen = strlen(src); int32_t dLen = strlen(dst); - char* srcName = taosMemoryCalloc(1, sLen + 64); - char* dstName = taosMemoryCalloc(1, dLen + 64); + int32_t cap = TMAX(sLen, dLen) + 64; + int32_t nBytes = 0; + + char* srcName = taosMemoryCalloc(1, cap); + char* dstName = taosMemoryCalloc(1, cap); + if (srcName == NULL || dstName == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } // copy file to dst TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) { - taosMemoryFree(srcName); - taosMemoryFree(dstName); - code = TAOS_SYSTEM_ERROR(errno); - - errno = 0; - return code; + terrno = TAOS_SYSTEM_ERROR(errno); } errno = 0; TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) { continue; } - sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); - sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) { code = copyFiles_create(srcName, dstName, 0); if (code != 0) { - code = TAOS_SYSTEM_ERROR(code); - stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(terrno)); goto _ERROR; } } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) { code = copyFiles_create(srcName, dstName, 0); if (code != 0) { - code = TAOS_SYSTEM_ERROR(code); - stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(terrno)); goto _ERROR; } } else { code = copyFiles_hardlink(srcName, dstName, 0); if (code != 0) { - code = TAOS_SYSTEM_ERROR(code); - stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code)); + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(terrno)); goto _ERROR; } else { stDebug("succ hard link file:%s to %s", srcName, dstName); } } - memset(srcName, 0, sLen + 64); - memset(dstName, 0, dLen + 64); + memset(srcName, 0, cap); + memset(dstName, 0, cap); } taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); taosCloseDir(&pDir); - errno = 0; return code; _ERROR: taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); taosCloseDir(&pDir); - errno = 0; return code; } @@ -568,7 +607,8 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch if (code != TSDB_CODE_SUCCESS) { cleanDir(defaultPath, pTaskIdStr); stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote", - pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); + pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(terrno))); + terrno = 0; code = TSDB_CODE_SUCCESS; } else { stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,