enh/wal_level: new level 0 to disable wal writing

This commit is contained in:
Minglei Jin 2024-06-05 15:19:55 +08:00
parent 9739c9d7d5
commit d6b2bcba68
6 changed files with 78 additions and 58 deletions

View File

@ -45,6 +45,7 @@ extern "C" {
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum { typedef enum {
TAOS_WAL_SKIP = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2, TAOS_WAL_FSYNC = 2,
} EWalType; } EWalType;
@ -151,7 +152,8 @@ struct SWalReader {
TdFilePtr pIdxFile; TdFilePtr pIdxFile;
int64_t curFileFirstVer; int64_t curFileFirstVer;
int64_t curVersion; int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated
// data
int64_t capacity; int64_t capacity;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;

View File

@ -365,7 +365,7 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_FSYNC_PERIOD 0 #define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond #define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second #define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_WAL_LEVEL 1 #define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_WAL_LEVEL 2 #define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1 #define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI #define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI

View File

@ -6771,6 +6771,15 @@ static int32_t checkDbEnumOption(STranslateContext* pCxt, const char* pName, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t checkDbEnumOption3(STranslateContext* pCxt, const char* pName, int32_t val, int32_t v1, int32_t v2,
int32_t v3) {
if (val >= 0 && val != v1 && val != v2 && val != v3) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option %s: %d, only %d, %d, %d allowed", pName, val, v1, v2, v3);
}
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) { static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) {
if (NULL == pRetentions) { if (NULL == pRetentions) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -6914,7 +6923,12 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
if ((pOptions->replica == 2) ^ (pOptions->withArbitrator == TSDB_MAX_DB_WITH_ARBITRATOR)) { if ((pOptions->replica == 2) ^ (pOptions->withArbitrator == TSDB_MAX_DB_WITH_ARBITRATOR)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid database option, with_arbitrator should be used with replica 2"); "Invalid option, with_arbitrator should be used with replica 2");
}
if (pOptions->replica > 1 && pOptions->walLevel == 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -6978,7 +6992,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
code = checkDbStrictOption(pCxt, pOptions); code = checkDbStrictOption(pCxt, pOptions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_MAX_WAL_LEVEL); code = checkDbEnumOption3(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_DEFAULT_WAL_LEVEL,
TSDB_MAX_WAL_LEVEL);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "vgroups", pOptions->numOfVgroups, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); code = checkDbRangeOption(pCxt, "vgroups", pOptions->numOfVgroups, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);

View File

@ -20,7 +20,6 @@
#include "tutil.h" #include "tutil.h"
#include "walInt.h" #include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
} }
@ -226,7 +225,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto _err; goto _err;
} }
if (taosFsyncFile(pFile) < 0) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
@ -636,7 +635,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr); pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr);
goto _err; goto _err;
} }
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err; goto _err;
@ -644,7 +643,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
count++; count++;
} }
if (taosFsyncFile(pIdxFile) < 0) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err; goto _err;
@ -880,13 +879,13 @@ int walSaveMeta(SWal* pWal) {
int n; int n;
// fsync the idx and log file at first to ensure validity of meta // fsync the idx and log file at first to ensure validity of meta
if (taosFsyncFile(pWal->pIdxFile) < 0) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) {
wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno)); wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosFsyncFile(pWal->pLogFile) < 0) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno)); wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -901,7 +900,8 @@ int walSaveMeta(SWal* pWal) {
return -1; return -1;
} }
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); TdFilePtr pMetaFile =
taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pMetaFile == NULL) { if (pMetaFile == NULL) {
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -910,13 +910,13 @@ int walSaveMeta(SWal* pWal) {
char* serialized = walMetaSerialize(pWal); char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized); int len = strlen(serialized);
if (len != taosWriteFile(pMetaFile, serialized, len)) { if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (taosFsyncFile(pMetaFile) < 0) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;

View File

@ -79,8 +79,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (pWal->pLogFile != NULL) { if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile); if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
@ -91,8 +90,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
} }
} }
if (pWal->pIdxFile != NULL) { if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile); if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }

View File

@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "crypt.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tglobal.h" #include "tglobal.h"
#include "walInt.h" #include "walInt.h"
#include "crypt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
@ -396,8 +396,7 @@ int32_t walRollImpl(SWal *pWal) {
int32_t code = 0; int32_t code = 0;
if (pWal->pIdxFile != NULL) { if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile); if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto END; goto END;
} }
@ -409,8 +408,7 @@ int32_t walRollImpl(SWal *pWal) {
} }
if (pWal->pLogFile != NULL) { if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile); if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto END; goto END;
} }
@ -510,12 +508,15 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index, wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index,
TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody); TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
if (pWal->cfg.level != TAOS_WAL_SKIP) {
code = walWriteIndex(pWal, index, offset); code = walWriteIndex(pWal, index, offset);
if (code < 0) { if (code < 0) {
goto END; goto END;
} }
}
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { if (pWal->cfg.level != TAOS_WAL_SKIP &&
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
@ -565,7 +566,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
buf = newBodyEncrypted; buf = newBodyEncrypted;
} }
if (taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) { if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
@ -693,6 +694,10 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
} }
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (pWal->cfg.level == TAOS_WAL_SKIP) {
return;
}
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));