diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 7df6124152..99bbfde3e1 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -126,6 +126,9 @@ extern char tsSmlChildTableName[]; extern char tsSmlTagName[]; extern bool tsSmlDataFormat; +// wal +extern int64_t tsWalFsyncDataSizeLimit; + // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 08dba5d50d..ed85fd3517 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -43,7 +43,6 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) -#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE) typedef enum { TAOS_WAL_WRITE = 1, @@ -159,6 +158,7 @@ void walCleanUp(); // handle open and ctl SWal *walOpen(const char *path, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg); +int32_t walPersist(SWal *); void walClose(SWal *); // write interfaces diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 660730f0dc..7f4a826c5e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR // udf bool tsStartUdfd = true; +// wal +int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); + // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; @@ -423,6 +426,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0) + return -1; + if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; @@ -722,6 +728,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; + tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e593bbd602..7040d2d7c8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); + // persist wal before starting + if (walPersist(pVnode->pWal) < 0) { + vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); + return -1; + } + pVnode->state.commitTerm = pVnode->state.applyTerm; // save info diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 1aea0e8148..6dc9922981 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -34,6 +34,7 @@ typedef struct { int64_t createTs; int64_t closeTs; int64_t fileSize; + int64_t syncedOffset; } SWalFileInfo; typedef struct WalIdxEntry { @@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) { return pInfo->fileSize; } +static inline int64_t walGetLastFileCachedSize(SWal* pWal) { + if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0; + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return (pInfo->fileSize - pInfo->syncedOffset); +} + static inline int64_t walGetLastFileFirstVer(SWal* pWal) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ad27a50f68..3baa906b19 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -16,6 +16,7 @@ #include "cJSON.h" #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "tutil.h" #include "walInt.h" @@ -65,32 +66,43 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { // ensure size as non-negative pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); + int64_t stepSize = WAL_SCAN_BUF_SIZE; uint64_t magic = WAL_MAGIC; int64_t walCkHeadSz = sizeof(SWalCkHead); int64_t end = fileSize; - int64_t offset = 0; int64_t capacity = 0; int64_t readSize = 0; char* buf = NULL; - int64_t found = -1; bool firstTrial = pFileInfo->fileSize < fileSize; + int64_t offset = TMIN(pFileInfo->fileSize, fileSize); + int64_t offsetForward = offset - stepSize + walCkHeadSz - 1; + int64_t offsetBackward = offset; + int64_t retVer = -1; + int64_t lastEntryBeginOffset = 0; + int64_t lastEntryEndOffset = 0; + + // check recover size + if (2 * tsWalFsyncDataSizeLimit + offset < end) { + wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 + ", end:%" PRId64 ", file:%s", + pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); + } // search for the valid last WAL entry, e.g. block by block while (1) { - offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); + offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1) + : TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1); + end = TMIN(offset + stepSize, fileSize); + if (firstTrial) { + offsetForward = offset; + } else { + offsetBackward = offset; + } + ASSERT(offset <= end); readSize = end - offset; capacity = readSize + sizeof(magic); - int64_t limit = WAL_RECOV_SIZE_LIMIT; - if (limit < readSize) { - wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 - ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, limit, offset, end, fnameStr); - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - goto _err; - } - void* ptr = taosMemoryRealloc(buf, capacity); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; @@ -127,6 +139,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } logContent = (SWalCkHead*)(buf + pos); if (walValidHeadCksum(logContent) != 0) { + terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, offset + pos, fnameStr); haystack = buf + pos + 1; @@ -179,46 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } // found one - found = pos; + retVer = logContent->head.version; + lastEntryBeginOffset = offset + pos; + lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen; + + // try next haystack = buf + pos + 1; } - if (found >= 0 || offset == 0) break; - - // go backwards, e.g. by at most one WAL scan buf size - end = TMIN(offset + walCkHeadSz - 1, fileSize); - firstTrial = false; + if (end == fileSize) firstTrial = false; + if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue; + if (retVer >= 0 || offset == 0) break; } - // determine end of last entry - SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL; - int64_t retVer = -1; - int64_t lastEntryBeginOffset = 0; - int64_t lastEntryEndOffset = 0; - - if (lastEntry == NULL) { + if (retVer < 0) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - } else { - retVer = lastEntry->head.version; - lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf); - lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; } // truncate file if (lastEntryEndOffset != fileSize) { wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize); + if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (taosFsyncFile(pFile) < 0) { wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } } + pFileInfo->fileSize = lastEntryEndOffset; taosCloseFile(&pFile); @@ -621,6 +629,7 @@ int walRollFileInfo(SWal* pWal) { pNewInfo->createTs = ts; pNewInfo->closeTs = -1; pNewInfo->fileSize = 0; + pNewInfo->syncedOffset = 0; taosArrayPush(pArray, pNewInfo); taosMemoryFree(pNewInfo); return 0; @@ -771,6 +780,12 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } +void walUpdateSyncedOffset(SWal* pWal) { + SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal); + if (pFileInfo == NULL) return; + pFileInfo->syncedOffset = pFileInfo->fileSize; +} + int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; @@ -790,6 +805,9 @@ int walSaveMeta(SWal* pWal) { return -1; } + // update synced offset + (void)walUpdateSyncedOffset(pWal); + // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 0df1a6b387..1a70a3038f 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } +int32_t walPersist(SWal *pWal) { + taosThreadMutexLock(&pWal->mutex); + int32_t ret = walSaveMeta(pWal); + taosThreadMutexUnlock(&pWal->mutex); + return ret; +} + void walClose(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index e016e72471..c723618a4b 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -16,6 +16,7 @@ #include "os.h" #include "taoserror.h" #include "tchecksum.h" +#include "tglobal.h" #include "walInt.h" int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { @@ -252,23 +253,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { } } + if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) { + if (walSaveMeta(pWal) < 0) { + return -1; + } + } + return 0; } int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { + taosThreadMutexLock(&pWal->mutex); + pWal->vers.verInSnapshotting = ver; wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling if (pWal->cfg.retentionPeriod == 0) { - taosThreadMutexLock(&pWal->mutex); if (walGetLastFileSize(pWal) != 0) { - walRollImpl(pWal); + if (walRollImpl(pWal) < 0) { + wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr()); + goto _err; + } } - taosThreadMutexUnlock(&pWal->mutex); } - + taosThreadMutexUnlock(&pWal->mutex); return 0; + +_err: + taosThreadMutexUnlock(&pWal->mutex); + return -1; } int32_t walEndSnapshot(SWal *pWal) {