From 33aef6ddc550808185e720e2093d920ab35228ac Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 25 Jun 2024 07:12:09 +0000 Subject: [PATCH] add self check info --- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 105 +++++++++++++++--- source/libs/stream/src/streamMeta.c | 28 +++-- 3 files changed, 104 insertions(+), 31 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index ebeedcb5d2..24cd861550 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -141,7 +141,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId); +STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer); void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy2(void* pBackend); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4915d4b122..4278757136 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -326,9 +326,11 @@ void cleanDir(const char* pPath, const char* id) { } } -void validateDir(const char* pPath) { +int32_t createDirIfNotExist(const char* pPath) { if (!taosIsDir(pPath)) { - taosMulMkDir(pPath); + return taosMulMkDir(pPath); + } else { + return 0; } } @@ -419,6 +421,9 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { const char* current = "CURRENT"; size_t currLen = strlen(current); + const char* info = "info"; + size_t infoLen = strlen(info); + int32_t code = 0; int32_t sLen = strlen(src); int32_t dLen = strlen(dst); @@ -455,6 +460,14 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } + } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) { + code = copyFiles_create(srcName, dstName, 0); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(code); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); + goto _ERROR; + } + } else { code = copyFiles_hardlink(srcName, dstName, 0); if (code != 0) { @@ -487,7 +500,7 @@ _ERROR: int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); } static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, - const char* defaultPath) { + const char* defaultPath, int64_t* processVer) { int32_t code = 0; cleanDir(defaultPath, pTaskIdStr); @@ -512,34 +525,67 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch return code; } -int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) { - int32_t code = 0; +int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath, + int64_t* processVer) { + int32_t code = -1; + + size_t pathLen = strlen(path); + char* prefixPath = NULL; + char* defaultPath = NULL; + + // alloc buf + prefixPath = taosMemoryCalloc(1, pathLen + 64); + if (prefixPath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } - char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); + code = createDirIfNotExist(prefixPath); + if (code != 0) { + goto _EXIT; + } - validateDir(prefixPath); + defaultPath = taosMemoryCalloc(1, pathLen + 128); + if (defaultPath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } - char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256); sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state"); + code = createDirIfNotExist(defaultPath); + if (code != 0) { + goto _EXIT; + } - validateDir(defaultPath); - int32_t pathLen = strlen(path) + 256; + // int32_t pathLen = strlen(path) + 48; + char* checkpointRoot = taosMemoryCalloc(1, pathLen + 48); + if (checkpointRoot == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } - char* checkpointRoot = taosMemoryCalloc(1, pathLen); sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); - - validateDir(checkpointRoot); - taosMemoryFree(checkpointRoot); + code = createDirIfNotExist(checkpointRoot); + if (code != 0) { + taosMemoryFreeClear(checkpointRoot); + goto _EXIT; + } + taosMemoryFreeClear(checkpointRoot); stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); char* chkptPath = taosMemoryCalloc(1, pathLen); + if (chkptPath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + if (chkptId > 0) { snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); - code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); + code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath, processVer); if (code != 0) { code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath); } @@ -559,7 +605,11 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId *dbPath = defaultPath; *dbPrefixPath = prefixPath; + return 0; +_EXIT: + taosMemoryFree(defaultPath); + taosMemoryFree(prefixPath); return code; } @@ -2216,15 +2266,33 @@ _EXIT: return NULL; } -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId) { +STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { char* statePath = NULL; char* dbPath = NULL; - if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath) != 0) { + if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer) != 0) { + stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, + chkptId, tstrerror(terrno)); return NULL; } STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); + if (pTaskDb != NULL) { + int64_t chkpId = -1, ver = -1; + if (chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0) { + *processVer = ver; + } else { + if (terrno == TSDB_CODE_OUT_OF_MEMORY) { + taskDbDestroy(pTaskDb, false); + return NULL; + } else { + // not info file exists, caller handle this situation + terrno = 0; + *processVer = -1; + } + } + } + taosMemoryFree(dbPath); taosMemoryFree(statePath); return pTaskDb; @@ -2435,7 +2503,8 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; - STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0); + int64_t processVer = -1; + STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer); RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 03c7b93f91..864f9514da 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -151,7 +151,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { int8_t ret = STREAM_STATA_COMPATIBLE; TBC* pCur = NULL; - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream return ret; } @@ -262,8 +262,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) } STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; while (1) { - pBackend = taskDbOpen(pMeta->path, key, chkpId); + pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer); if (pBackend != NULL) { break; } @@ -557,7 +558,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return -1; } - if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) { pTask->ver = SSTREAM_TASK_VER; } @@ -907,7 +908,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (p == NULL) { code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { - stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); + stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); continue; } @@ -1012,7 +1013,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (pTask == NULL) { continue; @@ -1052,12 +1053,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; + entry.checkpointInfo.failed = + ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, + (*pTask)->chkInfo.pActiveInfo->transId); } } @@ -1384,7 +1387,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now); + stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%" PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { stInfo("vgId:%d no tasks to be started", pMeta->vgId); @@ -1513,8 +1516,8 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { continue; } @@ -1598,7 +1601,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", - pMeta->vgId, taskId, ready, el); + pMeta->vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; } @@ -1725,7 +1728,8 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0); stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64 - " ms", id, vgId, transId, el); + " ms", + id, vgId, transId, el); } else { stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", id, vgId, transId, el);