From d6b2bcba68aa27c97a0ccb6e410e6e0ae8f33fc2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 5 Jun 2024 15:19:55 +0800 Subject: [PATCH] enh/wal_level: new level 0 to disable wal writing --- include/libs/wal/wal.h | 22 +++++---- include/util/tdef.h | 2 +- source/libs/parser/src/parTranslater.c | 19 +++++++- source/libs/wal/src/walMeta.c | 22 ++++----- source/libs/wal/src/walSeek.c | 6 +-- source/libs/wal/src/walWrite.c | 65 ++++++++++++++------------ 6 files changed, 78 insertions(+), 58 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 25566d5acb..7f779609eb 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -45,6 +45,7 @@ extern "C" { #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) typedef enum { + TAOS_WAL_SKIP = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2, } EWalType; @@ -145,17 +146,18 @@ typedef struct SWalReader SWalReader; // todo hide this struct struct SWalReader { - SWal *pWal; - int64_t readerId; - TdFilePtr pLogFile; - TdFilePtr pIdxFile; - int64_t curFileFirstVer; - int64_t curVersion; - int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data + SWal *pWal; + int64_t readerId; + TdFilePtr pLogFile; + TdFilePtr pIdxFile; + int64_t curFileFirstVer; + int64_t curVersion; + int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated + // data int64_t capacity; TdThreadMutex mutex; SWalFilterCond cond; - SWalCkHead *pHead; + SWalCkHead *pHead; }; // module initialization @@ -198,7 +200,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func); +void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func); int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader *pReader); @@ -206,7 +208,7 @@ int64_t walReaderGetValidFirstVer(const SWalReader *pReader); int64_t walReaderGetSkipToVersion(SWalReader *pReader); void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver); void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever); -void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset); +void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset); // only for tq usage int32_t walFetchHead(SWalReader *pRead, int64_t ver); diff --git a/include/util/tdef.h b/include/util/tdef.h index 6ded54dc00..efcd9f31e9 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -365,7 +365,7 @@ typedef enum ELogicConditionType { #define TSDB_MIN_FSYNC_PERIOD 0 #define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond #define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second -#define TSDB_MIN_WAL_LEVEL 1 +#define TSDB_MIN_WAL_LEVEL 0 #define TSDB_MAX_WAL_LEVEL 2 #define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b6673d6219..b4e0c96703 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6771,6 +6771,15 @@ static int32_t checkDbEnumOption(STranslateContext* pCxt, const char* pName, int return TSDB_CODE_SUCCESS; } +static int32_t checkDbEnumOption3(STranslateContext* pCxt, const char* pName, int32_t val, int32_t v1, int32_t v2, + int32_t v3) { + if (val >= 0 && val != v1 && val != v2 && val != v3) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, + "Invalid option %s: %d, only %d, %d, %d allowed", pName, val, v1, v2, v3); + } + return TSDB_CODE_SUCCESS; +} + static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) { if (NULL == pRetentions) { return TSDB_CODE_SUCCESS; @@ -6914,7 +6923,12 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa if ((pOptions->replica == 2) ^ (pOptions->withArbitrator == TSDB_MAX_DB_WITH_ARBITRATOR)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, - "Invalid database option, with_arbitrator should be used with replica 2"); + "Invalid option, with_arbitrator should be used with replica 2"); + } + + if (pOptions->replica > 1 && pOptions->walLevel == 0) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, + "Invalid option, wal_level 0 should be used with replica 1"); } return TSDB_CODE_SUCCESS; @@ -6978,7 +6992,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName code = checkDbStrictOption(pCxt, pOptions); } if (TSDB_CODE_SUCCESS == code) { - code = checkDbEnumOption(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_MAX_WAL_LEVEL); + code = checkDbEnumOption3(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_DEFAULT_WAL_LEVEL, + TSDB_MAX_WAL_LEVEL); } if (TSDB_CODE_SUCCESS == code) { code = checkDbRangeOption(pCxt, "vgroups", pOptions->numOfVgroups, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 08162ef158..ac33b1d6b7 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -20,7 +20,6 @@ #include "tutil.h" #include "walInt.h" - bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; } @@ -154,7 +153,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { // validate body int32_t cryptedBodyLen = logContent->head.bodyLen; - if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){ + if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) { cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen); } recordLen = walCkHeadSz + cryptedBodyLen; @@ -226,7 +225,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { goto _err; } - if (taosFsyncFile(pFile) < 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) { wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -626,7 +625,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { int32_t plainBodyLen = ckHead.head.bodyLen; int32_t cryptedBodyLen = plainBodyLen; - if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){ + if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) { cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen); } idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen; @@ -636,7 +635,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr); goto _err; } - if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); goto _err; @@ -644,7 +643,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { count++; } - if (taosFsyncFile(pIdxFile) < 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); goto _err; @@ -880,13 +879,13 @@ int walSaveMeta(SWal* pWal) { int n; // fsync the idx and log file at first to ensure validity of meta - if (taosFsyncFile(pWal->pIdxFile) < 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) { wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (taosFsyncFile(pWal->pLogFile) < 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) { wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -901,7 +900,8 @@ int walSaveMeta(SWal* pWal) { return -1; } - TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + TdFilePtr pMetaFile = + taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); if (pMetaFile == NULL) { wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); terrno = TAOS_SYSTEM_ERROR(errno); @@ -910,13 +910,13 @@ int walSaveMeta(SWal* pWal) { char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); - if (len != taosWriteFile(pMetaFile, serialized, len)) { + if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) { wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile(pMetaFile) < 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) { wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index cbfd0ef741..0e452a937b 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -79,8 +79,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { TdFilePtr pIdxTFile, pLogTFile; char fnameStr[WAL_FILE_LEN]; if (pWal->pLogFile != NULL) { - code = taosFsyncFile(pWal->pLogFile); - if (code != 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -91,8 +90,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { } } if (pWal->pIdxFile != NULL) { - code = taosFsyncFile(pWal->pIdxFile); - if (code != 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 25fec4a998..19345e0644 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ +#include "crypt.h" #include "os.h" #include "taoserror.h" #include "tchecksum.h" #include "tglobal.h" #include "walInt.h" -#include "crypt.h" int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { taosThreadMutexLock(&pWal->mutex); @@ -295,9 +295,9 @@ int32_t walEndSnapshot(SWal *pWal) { ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1); // compatible mode for refVer - bool hasTopic = false; + bool hasTopic = false; int64_t refVer = INT64_MAX; - void *pIter = NULL; + void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); if (pIter == NULL) break; @@ -396,8 +396,7 @@ int32_t walRollImpl(SWal *pWal) { int32_t code = 0; if (pWal->pIdxFile != NULL) { - code = taosFsyncFile(pWal->pIdxFile); - if (code != 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto END; } @@ -409,8 +408,7 @@ int32_t walRollImpl(SWal *pWal) { } if (pWal->pLogFile != NULL) { - code = taosFsyncFile(pWal->pLogFile); - if (code != 0) { + if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto END; } @@ -510,12 +508,15 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index, TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody); - code = walWriteIndex(pWal, index, offset); - if (code < 0) { - goto END; + if (pWal->cfg.level != TAOS_WAL_SKIP) { + code = walWriteIndex(pWal, index, offset); + if (code < 0) { + goto END; + } } - if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { + if (pWal->cfg.level != TAOS_WAL_SKIP && + taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -524,17 +525,17 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy } int32_t cyptedBodyLen = plainBodyLen; - char* buf = (char*)body; - char* newBody = NULL; - char* newBodyEncrypted = NULL; + char *buf = (char *)body; + char *newBody = NULL; + char *newBodyEncrypted = NULL; - if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){ + if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) { cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen); - + newBody = taosMemoryMalloc(cyptedBodyLen); - if(newBody == NULL){ + if (newBody == NULL) { wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), - strerror(errno)); + strerror(errno)); code = -1; goto END; } @@ -542,11 +543,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy memcpy(newBody, body, plainBodyLen); newBodyEncrypted = taosMemoryMalloc(cyptedBodyLen); - if(newBodyEncrypted == NULL){ + if (newBodyEncrypted == NULL) { wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), - strerror(errno)); + strerror(errno)); code = -1; - if(newBody != NULL) taosMemoryFreeClear(newBody); + if (newBody != NULL) taosMemoryFreeClear(newBody); goto END; } @@ -559,29 +560,29 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy int32_t count = CBC_Encrypt(&opts); - //wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", - // pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__); + // wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s", + // pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__); buf = newBodyEncrypted; } - - if (taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) { + + if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); code = -1; - if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){ + if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) { taosMemoryFreeClear(newBody); taosMemoryFreeClear(newBodyEncrypted); } goto END; } - if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){ + if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) { taosMemoryFreeClear(newBody); - taosMemoryFreeClear(newBodyEncrypted); - //wInfo("vgId:%d, free newBody newBodyEncrypted %s", - // pWal->cfg.vgId, __FUNCTION__); + taosMemoryFreeClear(newBodyEncrypted); + // wInfo("vgId:%d, free newBody newBodyEncrypted %s", + // pWal->cfg.vgId, __FUNCTION__); } // set status @@ -693,6 +694,10 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in } void walFsync(SWal *pWal, bool forceFsync) { + if (pWal->cfg.level == TAOS_WAL_SKIP) { + return; + } + taosThreadMutexLock(&pWal->mutex); if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));