add self check info

This commit is contained in:
Yihao Deng 2024-06-25 07:12:09 +00:00
parent 7cab27110a
commit 33aef6ddc5
3 changed files with 104 additions and 31 deletions

View File

@ -141,7 +141,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId);
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer);
void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend);

View File

@ -326,9 +326,11 @@ void cleanDir(const char* pPath, const char* id) {
}
}
void validateDir(const char* pPath) {
int32_t createDirIfNotExist(const char* pPath) {
if (!taosIsDir(pPath)) {
taosMulMkDir(pPath);
return taosMulMkDir(pPath);
} else {
return 0;
}
}
@ -419,6 +421,9 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
const char* current = "CURRENT";
size_t currLen = strlen(current);
const char* info = "info";
size_t infoLen = strlen(info);
int32_t code = 0;
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
@ -455,6 +460,14 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
goto _ERROR;
}
} else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) {
code = copyFiles_create(srcName, dstName, 0);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(code);
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
goto _ERROR;
}
} else {
code = copyFiles_hardlink(srcName, dstName, 0);
if (code != 0) {
@ -487,7 +500,7 @@ _ERROR:
int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
const char* defaultPath) {
const char* defaultPath, int64_t* processVer) {
int32_t code = 0;
cleanDir(defaultPath, pTaskIdStr);
@ -512,34 +525,67 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch
return code;
}
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) {
int32_t code = 0;
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath,
int64_t* processVer) {
int32_t code = -1;
size_t pathLen = strlen(path);
char* prefixPath = NULL;
char* defaultPath = NULL;
// alloc buf
prefixPath = taosMemoryCalloc(1, pathLen + 64);
if (prefixPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key);
code = createDirIfNotExist(prefixPath);
if (code != 0) {
goto _EXIT;
}
validateDir(prefixPath);
defaultPath = taosMemoryCalloc(1, pathLen + 128);
if (defaultPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state");
code = createDirIfNotExist(defaultPath);
if (code != 0) {
goto _EXIT;
}
validateDir(defaultPath);
int32_t pathLen = strlen(path) + 256;
// int32_t pathLen = strlen(path) + 48;
char* checkpointRoot = taosMemoryCalloc(1, pathLen + 48);
if (checkpointRoot == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
char* checkpointRoot = taosMemoryCalloc(1, pathLen);
sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
validateDir(checkpointRoot);
taosMemoryFree(checkpointRoot);
code = createDirIfNotExist(checkpointRoot);
if (code != 0) {
taosMemoryFreeClear(checkpointRoot);
goto _EXIT;
}
taosMemoryFreeClear(checkpointRoot);
stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
char* chkptPath = taosMemoryCalloc(1, pathLen);
if (chkptPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
if (chkptId > 0) {
snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath, processVer);
if (code != 0) {
code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath);
}
@ -559,7 +605,11 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
*dbPath = defaultPath;
*dbPrefixPath = prefixPath;
return 0;
_EXIT:
taosMemoryFree(defaultPath);
taosMemoryFree(prefixPath);
return code;
}
@ -2216,15 +2266,33 @@ _EXIT:
return NULL;
}
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId) {
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) {
char* statePath = NULL;
char* dbPath = NULL;
if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath) != 0) {
if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer) != 0) {
stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
chkptId, tstrerror(terrno));
return NULL;
}
STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
if (pTaskDb != NULL) {
int64_t chkpId = -1, ver = -1;
if (chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0) {
*processVer = ver;
} else {
if (terrno == TSDB_CODE_OUT_OF_MEMORY) {
taskDbDestroy(pTaskDb, false);
return NULL;
} else {
// not info file exists, caller handle this situation
terrno = 0;
*processVer = -1;
}
}
}
taosMemoryFree(dbPath);
taosMemoryFree(statePath);
return pTaskDb;
@ -2435,7 +2503,8 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
int32_t code = 0;
STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0);
int64_t processVer = -1;
STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) {

View File

@ -151,7 +151,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
int8_t ret = STREAM_STATA_COMPATIBLE;
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream
return ret;
}
@ -262,8 +262,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
}
STaskDbWrapper* pBackend = NULL;
int64_t processVer = -1;
while (1) {
pBackend = taskDbOpen(pMeta->path, key, chkpId);
pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer);
if (pBackend != NULL) {
break;
}
@ -557,7 +558,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return -1;
}
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
pTask->ver = SSTREAM_TASK_VER;
}
@ -907,7 +908,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (p == NULL) {
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) {
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask);
continue;
}
@ -1012,7 +1013,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
@ -1052,12 +1053,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
}
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.failed =
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId);
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr,
(*pTask)->chkInfo.pActiveInfo->transId);
}
}
@ -1384,7 +1387,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now);
stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%" PRId64, vgId, numOfTasks, now);
if (numOfTasks == 0) {
stInfo("vgId:%d no tasks to be started", pMeta->vgId);
@ -1513,8 +1516,8 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < num; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL) {
continue;
}
@ -1598,7 +1601,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
if (pStartInfo->startAllTasks != 1) {
int64_t el = endTs - startTs;
stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
pMeta->vgId, taskId, ready, el);
pMeta->vgId, taskId, ready, el);
streamMetaWUnLock(pMeta);
return 0;
}
@ -1725,7 +1728,8 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
" ms", id, vgId, transId, el);
" ms",
id, vgId, transId, el);
} else {
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
id, vgId, transId, el);