From cccfa17027c8375945b5131c4fd0c69f9cd4d1b2 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Mon, 31 Oct 2022 23:58:49 +0800 Subject: [PATCH 1/9] fix: fsync wal files and their meta if data size unsynced exceeds a limit --- source/libs/wal/inc/walInt.h | 7 +++++++ source/libs/wal/src/walMeta.c | 10 ++++++++++ source/libs/wal/src/walWrite.c | 21 +++++++++++++++++---- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 1aea0e8148..6dc9922981 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -34,6 +34,7 @@ typedef struct { int64_t createTs; int64_t closeTs; int64_t fileSize; + int64_t syncedOffset; } SWalFileInfo; typedef struct WalIdxEntry { @@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) { return pInfo->fileSize; } +static inline int64_t walGetLastFileCachedSize(SWal* pWal) { + if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0; + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return (pInfo->fileSize - pInfo->syncedOffset); +} + static inline int64_t walGetLastFileFirstVer(SWal* pWal) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index b489df2f4c..bcc9a11458 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -589,6 +589,7 @@ int walRollFileInfo(SWal* pWal) { pNewInfo->createTs = ts; pNewInfo->closeTs = -1; pNewInfo->fileSize = 0; + pNewInfo->syncedOffset = 0; taosArrayPush(pArray, pNewInfo); taosMemoryFree(pNewInfo); return 0; @@ -739,6 +740,12 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } +void walUpdateSyncedOffset(SWal* pWal) { + SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal); + if (pFileInfo == NULL) return; + pFileInfo->syncedOffset = pFileInfo->fileSize; +} + int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; @@ -758,6 +765,9 @@ int walSaveMeta(SWal* pWal) { return -1; } + // update synced offset + (void)walUpdateSyncedOffset(pWal); + // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 527ffa0056..7f4fbb7c71 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -252,23 +252,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { } } + if (walGetLastFileCachedSize(pWal) > WAL_RECOV_SIZE_LIMIT / 2) { + if (walSaveMeta(pWal) < 0) { + return -1; + } + } + return 0; } int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { + taosThreadMutexLock(&pWal->mutex); + pWal->vers.verInSnapshotting = ver; wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling if (pWal->cfg.retentionPeriod == 0) { - taosThreadMutexLock(&pWal->mutex); if (walGetLastFileSize(pWal) != 0) { - walRollImpl(pWal); + if (walRollImpl(pWal) < 0) { + wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr()); + goto _err; + } } - taosThreadMutexUnlock(&pWal->mutex); } - + taosThreadMutexUnlock(&pWal->mutex); return 0; + +_err: + taosThreadMutexUnlock(&pWal->mutex); + return -1; } int32_t walEndSnapshot(SWal *pWal) { From 07db554fab8afd96cdbebbed3a0974248a188815 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Tue, 1 Nov 2022 00:00:18 +0800 Subject: [PATCH 2/9] enh: persist wal at the beginning of vnodeCommit --- include/libs/wal/wal.h | 3 ++- source/dnode/vnode/src/vnd/vnodeCommit.c | 6 ++++++ source/libs/wal/src/walMgmt.c | 7 +++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 08dba5d50d..8f936f988b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -43,7 +43,7 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) -#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE) +#define WAL_RECOV_SIZE_LIMIT (200 * WAL_SCAN_BUF_SIZE) typedef enum { TAOS_WAL_WRITE = 1, @@ -159,6 +159,7 @@ void walCleanUp(); // handle open and ctl SWal *walOpen(const char *path, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg); +int32_t walPersist(SWal *); void walClose(SWal *); // write interfaces diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e593bbd602..7040d2d7c8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); + // persist wal before starting + if (walPersist(pVnode->pWal) < 0) { + vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); + return -1; + } + pVnode->state.commitTerm = pVnode->state.applyTerm; // save info diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 0df1a6b387..1a70a3038f 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } +int32_t walPersist(SWal *pWal) { + taosThreadMutexLock(&pWal->mutex); + int32_t ret = walSaveMeta(pWal); + taosThreadMutexUnlock(&pWal->mutex); + return ret; +} + void walClose(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); (void)walSaveMeta(pWal); From 26778f29da330f4c9152227003a105b95650a601 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Nov 2022 10:42:13 +0800 Subject: [PATCH 3/9] enh: add config for persist wal at the beginning of vnodeCommit --- include/common/tglobal.h | 3 +++ include/libs/wal/wal.h | 1 - source/common/src/tglobal.c | 3 +++ source/libs/wal/src/walMeta.c | 6 +++--- source/libs/wal/src/walWrite.c | 3 ++- 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index cb4426f8a9..f277eea219 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -125,6 +125,9 @@ extern char tsSmlChildTableName[]; extern char tsSmlTagName[]; extern bool tsSmlDataFormat; +// wal +extern int64_t tsWalRecoverSizeLimit; + // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8f936f988b..ed85fd3517 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -43,7 +43,6 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) -#define WAL_RECOV_SIZE_LIMIT (200 * WAL_SCAN_BUF_SIZE) typedef enum { TAOS_WAL_WRITE = 1, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index fbb9e04a25..97c54390c0 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR // udf bool tsStartUdfd = true; +// wal +int64_t tsWalRecoverSizeLimit = (600 * 1024 * 1024L); + // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index bcc9a11458..e9b615b300 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "tutil.h" #include "walInt.h" +#include "tglobal.h" bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; @@ -82,11 +83,10 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { readSize = end - offset; capacity = readSize + sizeof(magic); - int64_t limit = WAL_RECOV_SIZE_LIMIT; - if (limit < readSize) { + if (tsWalRecoverSizeLimit < readSize) { wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, limit, offset, end, fnameStr); + pWal->cfg.vgId, tsWalRecoverSizeLimit, offset, end, fnameStr); terrno = TSDB_CODE_WAL_SIZE_LIMIT; goto _err; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 7f4fbb7c71..005450a267 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -16,6 +16,7 @@ #include "os.h" #include "taoserror.h" #include "tchecksum.h" +#include "tglobal.h" #include "walInt.h" int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { @@ -252,7 +253,7 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { } } - if (walGetLastFileCachedSize(pWal) > WAL_RECOV_SIZE_LIMIT / 2) { + if (walGetLastFileCachedSize(pWal) > tsWalRecoverSizeLimit / 2) { if (walSaveMeta(pWal) < 0) { return -1; } From a2d11e0fdb7200448aa7333ca62df63f997ca8b1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Nov 2022 11:27:28 +0800 Subject: [PATCH 4/9] enh: add config for persist wal at the beginning of vnodeCommit --- source/common/src/tglobal.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 97c54390c0..b76e9f6d67 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -425,6 +425,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "walRecoverSizeLimit", tsWalRecoverSizeLimit, 3*1024*1024, INT64_MAX, 0) != 0) return -1; + if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; GRANT_CFG_ADD; @@ -723,6 +725,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; + tsWalRecoverSizeLimit = cfgGetItem(pCfg, "walRecoverSizeLimit")->i32; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); From 2990a127cb3293fb4c8574b46e319db4368a24ae Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Nov 2022 11:29:02 +0800 Subject: [PATCH 5/9] enh: add config for persist wal at the beginning of vnodeCommit --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b76e9f6d67..0d45454731 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -725,7 +725,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; - tsWalRecoverSizeLimit = cfgGetItem(pCfg, "walRecoverSizeLimit")->i32; + tsWalRecoverSizeLimit = cfgGetItem(pCfg, "walRecoverSizeLimit")->i64; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); From 6907e3dfe310f7e7eddf6ed41b684ff1fcdfe6c0 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Tue, 1 Nov 2022 15:25:18 +0800 Subject: [PATCH 6/9] enh: scan forward step by step in walScanLogGetLastVer --- source/libs/wal/src/walMeta.c | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index bcc9a11458..33c8e6e4d7 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -65,6 +65,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { // ensure size as non-negative pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); + int64_t stepSize = WAL_SCAN_BUF_SIZE; uint64_t magic = WAL_MAGIC; int64_t walCkHeadSz = sizeof(SWalCkHead); int64_t end = fileSize; @@ -74,23 +75,25 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { char* buf = NULL; int64_t found = -1; bool firstTrial = pFileInfo->fileSize < fileSize; + int64_t border = TMIN(pFileInfo->fileSize, fileSize); + int64_t offsetForward = border - stepSize + walCkHeadSz - 1; + int64_t offsetBackward = border; // search for the valid last WAL entry, e.g. block by block while (1) { - offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); + offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1) + : TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1); + end = TMIN(offset + stepSize, fileSize); + if (firstTrial) { + offsetForward = offset; + } else { + offsetBackward = offset; + } + ASSERT(offset <= end); readSize = end - offset; capacity = readSize + sizeof(magic); - int64_t limit = WAL_RECOV_SIZE_LIMIT; - if (limit < readSize) { - wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 - ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, limit, offset, end, fnameStr); - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - goto _err; - } - void* ptr = taosMemoryRealloc(buf, capacity); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; @@ -127,6 +130,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } logContent = (SWalCkHead*)(buf + pos); if (walValidHeadCksum(logContent) != 0) { + terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, offset + pos, fnameStr); haystack = buf + pos + 1; @@ -183,11 +187,9 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { haystack = buf + pos + 1; } + if (end == fileSize) firstTrial = false; + if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue; if (found >= 0 || offset == 0) break; - - // go backwards, e.g. by at most one WAL scan buf size - end = TMIN(offset + walCkHeadSz - 1, fileSize); - firstTrial = false; } // determine end of last entry From 4237cafb21273c9b9376ff676aa37bfb345772c1 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Tue, 1 Nov 2022 17:19:33 +0800 Subject: [PATCH 7/9] enh: rename walRecoverSizeLimit to walFsyncDataSizeLimit --- include/common/tglobal.h | 2 +- source/common/src/tglobal.c | 7 ++++--- source/libs/wal/src/walMeta.c | 4 ++-- source/libs/wal/src/walWrite.c | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f277eea219..f12f03499f 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -126,7 +126,7 @@ extern char tsSmlTagName[]; extern bool tsSmlDataFormat; // wal -extern int64_t tsWalRecoverSizeLimit; +extern int64_t tsWalFsyncDataSizeLimit; // internal extern int32_t tsTransPullupInterval; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f1a0fa1145..3bfd10f687 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -157,7 +157,7 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR bool tsStartUdfd = true; // wal -int64_t tsWalRecoverSizeLimit = (600 * 1024 * 1024L); +int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); // internal int32_t tsTransPullupInterval = 2; @@ -425,7 +425,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; - if (cfgAddInt64(pCfg, "walRecoverSizeLimit", tsWalRecoverSizeLimit, 3 * 1024 * 1024, INT64_MAX, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0) + return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; @@ -725,7 +726,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; - tsWalRecoverSizeLimit = cfgGetItem(pCfg, "walRecoverSizeLimit")->i64; + tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index f52b80d1d3..4f8846d0ec 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -80,10 +80,10 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { int64_t offsetBackward = offset; int64_t recoverSize = end - offset; - if (tsWalRecoverSizeLimit < recoverSize) { + if (2 * tsWalFsyncDataSizeLimit < recoverSize) { wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, tsWalRecoverSizeLimit, offset, end, fnameStr); + pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); terrno = TSDB_CODE_WAL_SIZE_LIMIT; goto _err; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 005450a267..b5e9346b65 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -253,7 +253,7 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { } } - if (walGetLastFileCachedSize(pWal) > tsWalRecoverSizeLimit / 2) { + if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) { if (walSaveMeta(pWal) < 0) { return -1; } From 3c1c95c5e6f750374381068cc5ba41cdade98fc5 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Tue, 1 Nov 2022 17:26:31 +0800 Subject: [PATCH 8/9] enh: turn error on WAL data size to recover exceeding a limit as warn --- source/libs/wal/src/walMeta.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 4f8846d0ec..f70f423436 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -81,11 +81,9 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { int64_t recoverSize = end - offset; if (2 * tsWalFsyncDataSizeLimit < recoverSize) { - wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 - ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - goto _err; + wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 + ", end:%" PRId64 ", file:%s", + pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); } // search for the valid last WAL entry, e.g. block by block From 4e46b78aa0fccf518572879a84caae369d07b610 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Tue, 1 Nov 2022 17:56:24 +0800 Subject: [PATCH 9/9] fix: update info of WAL entry found within loop in walScanLogGetLastVer --- source/libs/wal/src/walMeta.c | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index f70f423436..f42bc46e3c 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -73,14 +73,16 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { int64_t capacity = 0; int64_t readSize = 0; char* buf = NULL; - int64_t found = -1; bool firstTrial = pFileInfo->fileSize < fileSize; int64_t offset = TMIN(pFileInfo->fileSize, fileSize); int64_t offsetForward = offset - stepSize + walCkHeadSz - 1; int64_t offsetBackward = offset; - int64_t recoverSize = end - offset; + int64_t retVer = -1; + int64_t lastEntryBeginOffset = 0; + int64_t lastEntryEndOffset = 0; - if (2 * tsWalFsyncDataSizeLimit < recoverSize) { + // check recover size + if (2 * tsWalFsyncDataSizeLimit + offset < end) { wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 ", end:%" PRId64 ", file:%s", pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); @@ -190,44 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } // found one - found = pos; + retVer = logContent->head.version; + lastEntryBeginOffset = offset + pos; + lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen; + + // try next haystack = buf + pos + 1; } if (end == fileSize) firstTrial = false; if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue; - if (found >= 0 || offset == 0) break; + if (retVer >= 0 || offset == 0) break; } - // determine end of last entry - SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL; - int64_t retVer = -1; - int64_t lastEntryBeginOffset = 0; - int64_t lastEntryEndOffset = 0; - - if (lastEntry == NULL) { + if (retVer < 0) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - } else { - retVer = lastEntry->head.version; - lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf); - lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; } // truncate file if (lastEntryEndOffset != fileSize) { wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize); + if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (taosFsyncFile(pFile) < 0) { wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } } + pFileInfo->fileSize = lastEntryEndOffset; taosCloseFile(&pFile);