Refator walMeta.c.

This commit is contained in:
xiao-77 2024-12-27 17:43:42 +08:00
parent 86b9110d59
commit 5c192cbcb5
5 changed files with 127 additions and 148 deletions

View File

@ -187,6 +187,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define TAOS_CHECK_RETURN_SET_CODE(CMD, CODE, ERRNO) \
do { \
int32_t __c = (CMD); \
if (__c != TSDB_CODE_SUCCESS) { \
(CODE) = (ERRNO); \
TAOS_RETURN(__c); \
} \
} while (0)
#define TAOS_CHECK_RETURN_WITH_RELEASE(CMD, PTR1, PTR2) \
do { \
int32_t __c = (CMD); \
@ -225,6 +234,16 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define TAOS_CHECK_EXIT_SET_CODE(CMD, CODE, ERRNO) \
do { \
code = (CMD); \
if (code < TSDB_CODE_SUCCESS) { \
(CODE) = (ERRNO); \
lino = __LINE__; \
goto _exit; \
} \
} while (0)
#define TAOS_UNUSED(expr) (void)(expr)
bool taosIsBigChar(char c);

View File

@ -156,6 +156,7 @@ static inline void walResetVer(SWalVer* pVer) {
int32_t walLoadMeta(SWal* pWal);
int32_t walSaveMeta(SWal* pWal);
int32_t walRemoveMeta(SWal* pWal);
int32_t walRollImpl(SWal* pWal);
int32_t walRollFileInfo(SWal* pWal);
int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer);
int32_t walCheckAndRepairMeta(SWal* pWal);

View File

@ -57,18 +57,10 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno;
goto _err;
}
TAOS_CHECK_GOTO(taosStatFile(fnameStr, &fileSize, NULL, NULL), &lino, _err);
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
*lastVer = retVer;
TAOS_RETURN(terrno);
}
TSDB_CHECK_NULL(pFile, code, lino, _err, terrno);
// ensure size as non-negative
pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
@ -102,16 +94,10 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
capacity = readSize + sizeof(magic);
ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _err);
}
TSDB_CHECK_NULL(ptr, code, lino, _err, terrno);
buf = ptr;
int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
if (ret < 0) {
wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), offset);
TAOS_CHECK_GOTO(terrno, &lino, _err);
}
TAOS_CHECK_GOTO(taosLSeekFile(pFile, offset, SEEK_SET), &lino, _err);
if (readSize != taosReadFile(pFile, buf, readSize)) {
wError("vgId:%d, failed to read file due to %s. readSize:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
@ -166,18 +152,10 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
if (capacity < readSize + extraSize + sizeof(magic)) {
capacity += extraSize;
void* ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _err);
}
TSDB_CHECK_NULL(ptr, code, lino, _err, terrno);
buf = ptr;
}
int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET);
if (ret < 0) {
wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(terrno),
offset);
code = terrno;
break;
}
TAOS_CHECK_GOTO(taosLSeekFile(pFile, offset + readSize, SEEK_SET), &lino, _err);
if (extraSize != taosReadFile(pFile, buf + readSize, extraSize)) {
wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", extraSize:%" PRId64 ", file:%s",
pWal->cfg.vgId, strerror(errno), offset + readSize, extraSize, fnameStr);
@ -187,10 +165,7 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
}
logContent = (SWalCkHead*)(buf + pos);
code = decryptBody(&pWal->cfg, logContent, logContent->head.bodyLen, __FUNCTION__);
if (code) {
break;
}
TAOS_CHECK_GOTO(decryptBody(&pWal->cfg, logContent, logContent->head.bodyLen, __FUNCTION__), &lino, _err);
if (walValidBodyCksum(logContent) != 0) {
code = TSDB_CODE_WAL_CHKSUM_MISMATCH;
@ -217,10 +192,6 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
if (forwardStage && (terrno != TSDB_CODE_SUCCESS || end == fileSize)) break;
}
if (retVer < 0) {
code = TSDB_CODE_WAL_LOG_NOT_EXIST;
}
// truncate file
if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
@ -240,6 +211,9 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
pFileInfo->fileSize = lastEntryEndOffset;
_err:
if (retVer < 0) {
code = TSDB_CODE_WAL_LOG_NOT_EXIST;
}
taosCloseFile(&pFile);
taosMemoryFree(buf);
*lastVer = retVer;
@ -371,45 +345,37 @@ static int32_t walLogEntriesComplete(const SWal* pWal) {
}
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t code = TSDB_CODE_SUCCESS;
TdFilePtr pFile = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
TdFilePtr pFile = NULL;
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
if (!pFileInfo) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
TSDB_CHECK_NULL(pFileInfo, code, lino, _exit, terrno);
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno;
goto _exit;
}
TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0);
int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
if (fileSize <= lastEndOffset) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
if (fileSize <= lastEndOffset) TAOS_RETURN(TSDB_CODE_SUCCESS);
pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
code = terrno;
goto _exit;
}
TSDB_CHECK_NULL(pFile, code, lino, _exit, terrno);
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
lastEndOffset);
code = taosFtruncateFile(pFile, lastEndOffset);
if (code < 0) {
wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
goto _exit;
}
TAOS_CHECK_EXIT(taosFtruncateFile(pFile, lastEndOffset));
_exit:
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, failed to trim idx file %s due to %s", pWal->cfg.vgId, fnameStr, tstrerror(code));
}
(void)taosCloseFile(&pFile);
TAOS_RETURN(code);
}
@ -428,32 +394,25 @@ static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {
int32_t walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info
int32_t code = 0;
int32_t lino = 0;
const char* logPattern = "^[0-9]+.log$";
const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern;
regex_t idxRegPattern;
TdDirPtr pDir = NULL;
SArray* actualLog = NULL;
wInfo("vgId:%d, begin to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
if (regcomp(&logRegPattern, logPattern, REG_EXTENDED) != 0) {
wError("failed to compile log pattern, error:%s", tstrerror(terrno));
return terrno;
}
if (regcomp(&idxRegPattern, idxPattern, REG_EXTENDED) != 0) {
wError("failed to compile idx pattern");
return terrno;
}
TAOS_CHECK_EXIT_SET_CODE(regcomp(&logRegPattern, logPattern, REG_EXTENDED), code, terrno);
TdDirPtr pDir = taosOpenDir(pWal->path);
if (pDir == NULL) {
regfree(&logRegPattern);
regfree(&idxRegPattern);
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return terrno;
}
TAOS_CHECK_EXIT_SET_CODE(regcomp(&idxRegPattern, idxPattern, REG_EXTENDED), code, terrno);
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
pDir = taosOpenDir(pWal->path);
TSDB_CHECK_NULL(pDir, code, lino, _exit, terrno);
actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
// scan log files and build new meta
TdDirEntryPtr pDirEntry;
@ -464,28 +423,10 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo;
(void)memset(&fileInfo, -1, sizeof(SWalFileInfo));
(void)sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
if (!taosArrayPush(actualLog, &fileInfo)) {
regfree(&logRegPattern);
regfree(&idxRegPattern);
int32_t ret = taosCloseDir(&pDir);
if (ret != 0) {
wError("failed to close dir, ret:%s", tstrerror(ret));
return terrno;
}
return terrno;
}
TSDB_CHECK_NULL(taosArrayPush(actualLog, &fileInfo), code, lino, _exit, terrno);
}
}
int32_t ret = taosCloseDir(&pDir);
if (ret != 0) {
wError("failed to close dir, ret:%s", tstrerror(ret));
return terrno;
}
regfree(&logRegPattern);
regfree(&idxRegPattern);
taosArraySort(actualLog, compareWalFileInfo);
wInfo("vgId:%d, actual log file, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
@ -500,11 +441,7 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
bool updateMeta = (metaFileNum != actualFileNum);
// rebuild meta of file info
code = walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
taosArrayDestroy(actualLog);
if (code) {
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(walRebuildFileInfoSet(pWal->fileInfoSet, actualLog));
wInfo("vgId:%d, log file in meta, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(pWal->fileInfoSet));
@ -521,12 +458,7 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int32_t code = taosStatFile(fnameStr, &fileSize, NULL, NULL);
if (code < 0) {
wError("failed to stat file since %s. file:%s", terrstr(), fnameStr);
TAOS_RETURN(terrno);
}
TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL));
if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
totSize += pFileInfo->fileSize;
@ -581,22 +513,24 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
wInfo("vgId:%d, success to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
_exit:
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, failed to repair meta due to %s, at line:%d", pWal->cfg.vgId, tstrerror(code), lino);
}
taosArrayDestroy(actualLog);
TAOS_UNUSED(taosCloseDir(&pDir));
regfree(&logRegPattern);
regfree(&idxRegPattern);
return code;
}
static int32_t walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
TAOS_RETURN(terrno);
}
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) return terrno;
if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
TAOS_RETURN(terrno);
}
if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) return terrno;
if (walValidHeadCksum(pCkHead) != 0) {
TAOS_RETURN(TSDB_CODE_WAL_CHKSUM_MISMATCH);
}
if (walValidHeadCksum(pCkHead) != 0) return TSDB_CODE_WAL_CHKSUM_MISMATCH;
return TSDB_CODE_SUCCESS;
}
@ -838,29 +772,17 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) {
wInfo("vgId:%d, failed to add meta to root", pWal->cfg.vgId);
}
if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) goto _err;
snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.firstVer);
if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) {
wInfo("vgId:%d, failed to add firstVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.snapshotVer);
if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) {
wInfo("vgId:%d, failed to add snapshotVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.commitVer);
if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) {
wInfo("vgId:%d, failed to add commitVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.lastVer);
if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) {
wInfo("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) goto _err;
if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) {
wInfo("vgId:%d, failed to add files to root", pWal->cfg.vgId);
}
if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) goto _err;
SWalFileInfo* pData = pWal->fileInfoSet->pData;
for (int i = 0; i < sz; i++) {
SWalFileInfo* pInfo = &pData[i];
@ -869,31 +791,20 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
}
if (pField == NULL) {
cJSON_Delete(pRoot);
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
// cjson only support int32_t or double
// string are used to prohibit the loss of precision
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->firstVer);
if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) {
wInfo("vgId:%d, failed to add firstVer to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->lastVer);
if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) {
wInfo("vgId:%d, failed to add lastVer to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->createTs);
if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) {
wInfo("vgId:%d, failed to add createTs to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->closeTs);
if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) {
wInfo("vgId:%d, failed to add closeTs to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->fileSize);
if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) {
wInfo("vgId:%d, failed to add fileSize to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) goto _err;
}
char* pSerialized = cJSON_Print(pRoot);
cJSON_Delete(pRoot);
@ -901,6 +812,9 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
*serialized = pSerialized;
TAOS_RETURN(TSDB_CODE_SUCCESS);
_err:
cJSON_Delete(pRoot);
return TSDB_CODE_FAILED;
}
int32_t walMetaDeserialize(SWal* pWal, const char* bytes) {

View File

@ -274,7 +274,7 @@ _exit:
TAOS_RETURN(code);
}
static int32_t walRollImpl(SWal *pWal) {
int32_t walRollImpl(SWal *pWal) {
int32_t code = 0, lino = 0;
if (pWal->cfg.level == TAOS_WAL_SKIP && pWal->pIdxFile != NULL && pWal->pLogFile != NULL) {

View File

@ -627,6 +627,51 @@ TEST_F(WalKeepEnv, walRollback) {
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanEnv, walRepairLogFileTs2) {
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollImpl(pWal);
ASSERT_EQ(code, 0);
for (i = 100; i < 200; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollImpl(pWal);
ASSERT_EQ(code, 0);
for (i = 200; i < 300; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollImpl(pWal);
ASSERT_EQ(code, 0);
// Try to step in ts repair logic.
SWalFileInfo* pFileInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 2);
pFileInfo->closeTs = -1;
code = walCheckAndRepairMeta(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv();
int code;