From 49ba8132c0f7388aad1c81be1e3b61e0acdfdcef Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 25 Jun 2024 12:04:10 +0000 Subject: [PATCH] add self check info --- source/libs/stream/src/streamBackendRocksdb.c | 134 +++++++++++++++--- source/libs/stream/src/streamSnapshot.c | 25 +++- 2 files changed, 139 insertions(+), 20 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4278757136..2642f608d9 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1178,6 +1178,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI } int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { + // vnode task->db SStreamMeta* pMeta = arg; taosThreadMutexLock(&pMeta->backendMutex); @@ -1186,27 +1187,44 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { while (pIter) { STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; - taskDbAddRef(pTaskDb); - int64_t chkpId = pTaskDb->chkpId; - taskDbRefChkp(pTaskDb, chkpId); - code = taskDbDoCheckpoint(pTaskDb, chkpId, 0); - if (code != 0) { - taskDbUnRefChkp(pTaskDb, chkpId); + void* p = taskDbAddRef(pTaskDb); + if (p == NULL) { + terrno = 0; + pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); + continue; } - taskDbRemoveRef(pTaskDb); + // add chkpId to in-use-ckpkIdSet + taskDbRefChkp(pTaskDb, pTaskDb->chkpId); + + code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, ((SStreamTask*)pTaskDb->pTask)->chkInfo.processedVer); + if (code != 0) { + // remove chkpId from in-use-ckpkIdSet + taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId); + taskDbRemoveRef(pTaskDb); + code = -1; + break; + } SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .chkpId = pTaskDb->chkpId, .dbPrefixPath = taosStrdup(pTaskDb->path)}; + if (snap.dbPrefixPath == NULL) { + // remove chkpid from chkp-in-use set + taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId); + taskDbRemoveRef(pTaskDb); + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + break; + } taosArrayPush(pSnap, &snap); + pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); } taosThreadMutexUnlock(&pMeta->backendMutex); - return code; } int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) { @@ -2172,23 +2190,35 @@ void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { int32_t code = 0; + char* statePath = taosMemoryCalloc(1, strlen(path) + 128); + if (statePath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - char* statePath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key); if (!taosDirExist(statePath)) { code = taosMulMkDir(statePath); if (code != 0) { - stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code)); + terrno = errno; + stError("failed to create dir: %s, reason:%s", statePath, tstrerror(terrno)); taosMemoryFree(statePath); return code; } } char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128); + if (dbPath == NULL) { + taosMemoryFree(statePath); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state"); if (!taosDirExist(dbPath)) { code = taosMulMkDir(dbPath); if (code != 0) { + terrno = errno; stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code)); taosMemoryFree(statePath); taosMemoryFree(dbPath); @@ -2384,6 +2414,11 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char } char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); if (taosIsDir(buf)) { code = 0; @@ -2402,6 +2437,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32); + if (temp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { @@ -4239,14 +4279,12 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { taosMemoryFreeClear(p->pCurrent); p->pCurrent = taosStrdup(name); - // taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { taosMemoryFreeClear(p->pManifest); p->pManifest = taosStrdup(name); - // taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { @@ -4301,31 +4339,75 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { return 0; } +void dbChkpDestroy(SDbChkp* pChkp); + SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->curChkpId = initChkpId; p->preCkptId = -1; p->pSST = taosArrayInit(64, sizeof(void*)); + if (p->pSST == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + dbChkpDestroy(p); + return NULL; + } + p->path = path; p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); + if (p->buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } p->idx = 0; p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (p->pSstTbl[0] == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (p->pSstTbl[1] == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } p->pAdd = taosArrayInit(64, sizeof(void*)); + if (p->pAdd == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->pDel = taosArrayInit(64, sizeof(void*)); + if (p->pDel == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->update = 0; taosThreadRwlockInit(&p->rwLock, NULL); SArray* list = NULL; int32_t code = dbChkpGetDelta(p, initChkpId, list); + if (code != 0) { + goto _EXIT; + } return p; +_EXIT: + dbChkpDestroy(p); + return NULL; } void dbChkpDestroy(SDbChkp* pChkp) { + if (pChkp == NULL) return; + taosMemoryFree(pChkp->buf); taosMemoryFree(pChkp->path); @@ -4357,6 +4439,11 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { char* srcDir = taosMemoryCalloc(1, len); char* dstDir = taosMemoryCalloc(1, len); + if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } + sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); sprintf(dstDir, "%s", dname); @@ -4375,7 +4462,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); if (taosCopyFile(srcBuf, dstBuf) < 0) { - stError("failed to copy file from %s to %s", srcBuf, dstBuf); + terrno = errno; + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); goto _ERROR; } } @@ -4392,7 +4480,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); if (taosCopyFile(srcBuf, dstBuf) < 0) { - stError("failed to copy file from %s to %s", srcBuf, dstBuf); + terrno = errno; + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); goto _ERROR; } @@ -4402,7 +4491,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); if (taosCopyFile(srcBuf, dstBuf) < 0) { - stError("failed to copy file from %s to %s", srcBuf, dstBuf); + terrno = errno; + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); goto _ERROR; } @@ -4412,17 +4502,21 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - stError("chkp failed to create meta file: %s", dstDir); + terrno = errno; + stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno)); goto _ERROR; } char content[128] = {0}; snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId); if (taosWriteFile(pFile, content, strlen(content)) <= 0) { - stError("chkp failed to write meta file: %s", dstDir); + terrno = errno; + stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno)); taosCloseFile(&pFile); + code = -1; goto _ERROR; } + taosCloseFile(&pFile); // clear delta data buf @@ -4471,6 +4565,12 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, if (pChkp == NULL) { char* path = taosMemoryCalloc(1, strlen(bm->path) + 64); + if (path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosThreadRwlockUnlock(&bm->rwLock); + return -1; + } + sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId); SDbChkp* p = dbChkpCreate(path, chkpId); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index adefe97f1f..868ff002bf 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -24,6 +24,7 @@ enum SBackendFileType { ROCKSDB_SST_TYPE = 3, ROCKSDB_CURRENT_TYPE = 4, ROCKSDB_CHECKPOINT_META_TYPE = 5, + ROCKSDB_CHECKPOINT_SELFCHECK_TYPE = 6, }; typedef struct SBackendFileItem { @@ -49,6 +50,7 @@ typedef struct SBackendSnapFiles2 { char* pOptions; SArray* pSst; char* pCheckpointMeta; + char* pCheckpointSelfcheck; char* path; int64_t checkpointId; @@ -111,6 +113,7 @@ const char* ROCKSDB_MAINFEST = "MANIFEST"; const char* ROCKSDB_SST = "sst"; const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; +const char* ROCKSDB_CHECKPOINT_SELF_CHECK = "info"; static int64_t kBlockSize = 64 * 1024; int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta); @@ -127,6 +130,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { int32_t ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); ret = taosStatFile(fullname, sz, NULL, NULL); @@ -148,7 +152,8 @@ int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDest void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { - char* buf = taosMemoryCalloc(1, 512); + int16_t cap = 511; + char* buf = taosMemoryCalloc(1, cap + 1); sprintf(buf + strlen(buf), "["); if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent); @@ -157,10 +162,10 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (pSnapFile->pSst) { for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* name = taosArrayGetP(pSnapFile->pSst, i); - sprintf(buf + strlen(buf), "%s,", name); + if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name); } } - sprintf(buf + strlen(buf) - 1, "]"); + if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]"); stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, pSnapFile->snapInfo.taskId, buf); @@ -199,6 +204,13 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { // meta item.name = pSnapFile->pCheckpointMeta; item.type = ROCKSDB_CHECKPOINT_META_TYPE; + if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) { + taosArrayPush(pSnapFile->pFileList, &item); + } + + item.name = pSnapFile->pCheckpointSelfcheck; + item.type = ROCKSDB_CHECKPOINT_SELFCHECK_TYPE; + if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) { taosArrayPush(pSnapFile->pFileList, &item); } @@ -231,6 +243,11 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { pSnapFile->pCheckpointMeta = taosStrdup(name); continue; } + if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_SELF_CHECK) && + 0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) { + pSnapFile->pCheckpointSelfcheck = taosStrdup(name); + continue; + } if (strlen(name) >= strlen(ROCKSDB_SST) && 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { char* sst = taosStrdup(name); @@ -276,6 +293,7 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { taosMemoryFree(pSnap->pMainfest); taosMemoryFree(pSnap->pOptions); taosMemoryFree(pSnap->path); + taosMemoryFree(pSnap->pCheckpointSelfcheck); for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { char* sst = taosArrayGetP(pSnap->pSst, i); taosMemoryFree(sst); @@ -298,6 +316,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (code != 0) { + stError("failed to do task db snap info, reason:%s", tstrerror(terrno)); taosArrayDestroy(pSnapInfoSet); return -1; }