From 51e4abe2563d5d45f093d04792776386cecfc065 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Fri, 28 Jun 2024 02:58:30 +0000 Subject: [PATCH] add self check --- source/libs/stream/src/streamBackendRocksdb.c | 179 +++++++++++++----- 1 file changed, 130 insertions(+), 49 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7ff651d190..53b45f13a2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1354,37 +1354,45 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) return -1; } - char* pDst = taosMemoryCalloc(1, len + 64); + int32_t cap = len + 64; + char* pDst = taosMemoryCalloc(1, cap); if (pDst == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir); goto _EXIT; } - nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP); - if (nBytes != strlen(pDst)) { - code = -1; + nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; stError("failed to build dst to load extra info, dir:%s", pChkpIdDir); goto _EXIT; } pFile = taosOpenFile(pDst, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to open file to load extra info, file:%s", pDst); + if (errno == ENOENT) { + // compatible with previous version + *processId = -1; + code = 0; + goto _EXIT; + } else { + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to open file to load extra info, file:%s", pDst); + } goto _EXIT; } if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { terrno = TAOS_SYSTEM_ERROR(errno); stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); - code = -1; goto _EXIT; } if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) { terrno = TSDB_CODE_INVALID_PARA; stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + goto _EXIT; } code = 0; _EXIT: @@ -1406,16 +1414,16 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno)); return -1; } - - char* pDst = taosMemoryCalloc(1, len + 64); + int32_t cap = len + 64; + char* pDst = taosMemoryCalloc(1, cap); if (pDst == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } - nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP); - if (nBytes != strlen(pDst)) { + nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= cap) { stError("failed to build dst to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } @@ -1428,8 +1436,8 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { } nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); - if (nBytes != strlen(buf)) { - code = -1; + if (nBytes <= 0 || nBytes >= sizeof(buf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; stError("failed to build content to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } @@ -2475,7 +2483,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char nBytes = snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); - if (nBytes != strlen(buf)) { + if (nBytes <= 0 || nBytes >= cap) { terrno = TSDB_CODE_OUT_OF_RANGE; return -1; } @@ -4311,6 +4319,7 @@ void dbChkpDebugInfo(SDbChkp* pDb) { } } int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { + int32_t nBytes; taosThreadRwlockWrlock(&p->rwLock); p->preCkptId = p->curChkpId; @@ -4368,6 +4377,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { char* name = taosHashGetKey(pIter, &len); if (name != NULL && !isBkdDataMeta(name, len)) { char* fname = taosMemoryCalloc(1, len + 1); + if (fname == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosThreadRwlockUnlock(&p->rwLock); + return -1; + } strncpy(fname, name, len); taosArrayPush(p->pAdd, &fname); } @@ -4496,30 +4510,32 @@ int32_t dbChkpInit(SDbChkp* p) { } #endif int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { + static char* chkpMeta = "META"; + int32_t code = -1; + int32_t cap = p->len + 128; + taosThreadRwlockRdlock(&p->rwLock); - int32_t code = -1; - int32_t len = p->len + 128; - char* srcBuf = taosMemoryCalloc(1, len); - char* dstBuf = taosMemoryCalloc(1, len); + char* srcBuf = taosMemoryCalloc(1, cap); + char* dstBuf = taosMemoryCalloc(1, cap); - char* srcDir = taosMemoryCalloc(1, len); - char* dstDir = taosMemoryCalloc(1, len); + char* srcDir = taosMemoryCalloc(1, cap); + char* dstDir = taosMemoryCalloc(1, cap); if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } - int nBytes = snprintf(srcDir, len, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, + int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); - if (nBytes != strlen(srcBuf)) { + if (nBytes <= 0 || nBytes >= cap) { terrno = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } - nBytes = snprintf(dstDir, len, "%s", dname); - if (nBytes != strlen(dstBuf)) { + nBytes = snprintf(dstDir, cap, "%s", dname); + if (nBytes <= 0 || nBytes >= cap) { terrno = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } @@ -4536,12 +4552,21 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { // add file to $name dir for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); + memset(srcBuf, 0, cap); + memset(dstBuf, 0, cap); char* filename = taosArrayGetP(p->pAdd, i); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (taosCopyFile(srcBuf, dstBuf) < 0) { terrno = errno; @@ -4553,14 +4578,29 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { char* filename = taosArrayGetP(p->pDel, i); char* p = taosStrdup(filename); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } taosArrayPush(list, &p); } // copy current file to dst dir - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); - sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); + memset(srcBuf, 0, cap); + memset(dstBuf, 0, cap); + + nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + if (taosCopyFile(srcBuf, dstBuf) < 0) { terrno = errno; stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); @@ -4568,23 +4608,37 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { } // copy manifest file to dst dir - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); - sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); + memset(srcBuf, 0, cap); + memset(dstBuf, 0, cap); + + nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + if (taosCopyFile(srcBuf, dstBuf) < 0) { terrno = errno; stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); goto _ERROR; } - static char* chkpMeta = "META"; - memset(dstBuf, 0, len); - sprintf(dstDir, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta); + memset(dstBuf, 0, cap); + nBytes = snprintf(dstDir, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = errno; + terrno = TAOS_SYSTEM_ERROR(errno); stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno)); goto _ERROR; } @@ -4592,23 +4646,20 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { char content[256] = {0}; nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId, "processVer", processId); - if (nBytes != strlen(content)) { - terrno = TSDB_CODE_INVALID_MSG; + if (nBytes <= 0 || nBytes >= sizeof(content)) { + terrno = TSDB_CODE_OUT_OF_RANGE; stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir); taosCloseFile(&pFile); - code = -1; goto _ERROR; } nBytes = taosWriteFile(pFile, content, strlen(content)); if (nBytes != strlen(content)) { - terrno = errno; + terrno = TAOS_SYSTEM_ERROR(errno); stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno)); taosCloseFile(&pFile); - code = -1; goto _ERROR; } - taosCloseFile(&pFile); // clear delta data buf @@ -4624,11 +4675,34 @@ _ERROR: taosMemoryFree(dstDir); return code; } + SBkdMgt* bkdMgtCreate(char* path) { SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (p->pDbChkpTbl == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + bkdMgtDestroy(p); + return NULL; + } + p->path = taosStrdup(path); - taosThreadRwlockInit(&p->rwLock, NULL); + if (p->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + bkdMgtDestroy(p); + return NULL; + } + + if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + bkdMgtDestroy(p); + return NULL; + } + return p; } @@ -4656,14 +4730,21 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL; if (pChkp == NULL) { - char* path = taosMemoryCalloc(1, strlen(bm->path) + 64); + int32_t cap = strlen(bm->path) + 64; + char* path = taosMemoryCalloc(1, cap); if (path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosThreadRwlockUnlock(&bm->rwLock); return -1; } - sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId); + int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + return -1; + } SDbChkp* p = dbChkpCreate(path, chkpId); if (p == NULL) {