refactor: rsma commit and recovery

This commit is contained in:
Cary Xu 2022-07-02 18:52:15 +08:00
parent 6f11d625b0
commit 0e9a22ffc8
4 changed files with 69 additions and 33 deletions

View File

@ -22,7 +22,7 @@
extern "C" { extern "C" {
#endif #endif
#define SMA_DEBUG_MODE // TODO: remove when release #undef SMA_DEBUG_MODE // TODO: remove when release
// smaDebug ================ // smaDebug ================
// clang-format off // clang-format off
@ -250,7 +250,7 @@ void tdCloseTFile(STFile *pTFile);
void tdDestroyTFile(STFile *pTFile); void tdDestroyTFile(STFile *pTFile);
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName); void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName);
void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, char *outputName); void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, bool endWithSep, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -157,13 +157,19 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
TdDirPtr pDir = NULL; TdDirPtr pDir = NULL;
TdDirEntryPtr pDirEntry = NULL; TdDirEntryPtr pDirEntry = NULL;
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
const char *pattern = "^v[0-9]+qtaskinfo\\.ver([0-9]+)?$"; const char *pattern = "v[0-9]+qtaskinfo\\.ver([0-9]+)?$";
regex_t regex; regex_t regex;
int code = 0;
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, dir); tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
// Resource allocation and init // Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED); if ((code = regcomp(&regex, pattern, REG_EXTENDED)) != 0) {
char errbuf[128];
regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
return TSDB_CODE_FAILED;
}
if ((pDir = taosOpenDir(dir)) == NULL) { if ((pDir = taosOpenDir(dir)) == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -171,45 +177,48 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t dirLen = strlen(dir);
char *dirEnd = POINTER_SHIFT(dir, dirLen);
regmatch_t regMatch[2]; regmatch_t regMatch[2];
while ((pDirEntry = taosReadDir(pDir)) != NULL) { while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *entryName = taosGetDirEntryName(pDirEntry); char *entryName = taosGetDirEntryName(pDirEntry);
if (!entryName) { if (!entryName) {
continue; continue;
} }
char *fileName = taosDirEntryBaseName(entryName);
int code = regexec(&regex, fileName, 2, regMatch, 0); code = regexec(&regex, entryName, 2, regMatch, 0);
if (code == 0) { if (code == 0) {
// match // match
smaDebug("vgId:%d, matched = %s, %s", TD_VID(pVnode), (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so),
(const char *)POINTER_SHIFT(fileName, regMatch[1].rm_so));
int64_t version = -1; int64_t version = -1;
sscanf((const char *)POINTER_SHIFT(fileName, regMatch[1].rm_so), "%" PRIi64, &version); sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &version);
if ((version < committed) && (version > -1)) { if ((version < committed) && (version > -1)) {
if (taosRemoveFile(entryName) != 0) { strncpy(dirEnd, entryName, TSDB_FILENAME_LEN - dirLen);
if (taosRemoveFile(dir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed, smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
entryName, terrstr()); dir, terrstr());
} else { } else {
smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, dir);
entryName);
} }
} }
} else if (code == REG_NOMATCH) { } else if (code == REG_NOMATCH) {
// not match // not match
smaInfo("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), fileName); smaTrace("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), entryName);
continue; continue;
} else { } else {
// has other error // has other error
terrno = TAOS_SYSTEM_ERROR(code); char errbuf[128];
smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), terrstr()); regerror(code, &regex, errbuf, sizeof(errbuf));
smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), errbuf);
taosCloseDir(&pDir); taosCloseDir(&pDir);
regfree(&regex); regfree(&regex);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
regfree(&regex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -15,8 +15,8 @@
#include "sma.h" #include "sma.h"
#define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_BUFSIZE 32768
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.smaRef = -1, .smaRef = -1,
@ -42,7 +42,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
@ -743,7 +743,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
@ -753,7 +753,12 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t arrSize = taosArrayGetSize(suidList); int64_t arrSize = taosArrayGetSize(suidList);
if (nTables) {
*nTables = arrSize;
}
if (arrSize == 0) { if (arrSize == 0) {
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
@ -762,9 +767,9 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0); metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < arrSize; ++i) { for (int64_t i = 0; i < arrSize; ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("vgId:%d, rsma restore, suid[%d] is %" PRIi64, TD_VID(pVnode), i, suid); smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) { if (metaGetTableEntryByUid(&mr, suid) < 0) {
smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr()); terrstr());
@ -809,7 +814,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
} }
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
return TSDB_CODE_SUCCESS; if (pVnode->state.committed) {
goto _err;
} else {
smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
return TSDB_CODE_SUCCESS;
}
} }
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
@ -836,7 +847,8 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
tdDestroyTFile(&tFile); tdDestroyTFile(&tFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr()); smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode),
pVnode->state.committed, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -855,10 +867,15 @@ _err:
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
int64_t nTables = 0;
// step 1: iterate all stables to restore the rsma env // step 1: iterate all stables to restore the rsma env
if (tdRSmaRestoreQTaskInfoInit(pSma) < 0) { if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err; goto _err;
} }
if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) { if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) {
@ -872,7 +889,7 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
smaError("failed to restore rsma task since %s", terrstr()); smaError("vgId:%d failed to restore rsma task since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -1012,7 +1029,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
smaError("restore rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); smaError("vgId:%d, restore rsma qtaskinfo file %s failed since %s", SMA_VID(pSma),
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -202,12 +202,21 @@ void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const
} }
} }
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, char *outputName) { void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) {
if (pdname) { if (pdname) {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, if (endWithSep) {
dname); snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
dname);
}
} else { } else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname); if (endWithSep) {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP);
} else {
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname);
}
} }
} }