Merge pull request #26048 from taosdata/enh/TS-4885

enh/wal_level: new level 0 to disable wal writing
This commit is contained in:
Hongze Cheng 2024-06-06 10:19:50 +08:00 committed by GitHub
commit 070ec1412e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 93 additions and 64 deletions

View File

@ -45,6 +45,7 @@ extern "C" {
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum {
TAOS_WAL_SKIP = 0,
TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2,
} EWalType;
@ -145,17 +146,18 @@ typedef struct SWalReader SWalReader;
// todo hide this struct
struct SWalReader {
SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data
SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated
// data
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead;
SWalCkHead *pHead;
};
// module initialization
@ -198,7 +200,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func);
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func);
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
@ -206,7 +208,7 @@ int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
int64_t walReaderGetSkipToVersion(SWalReader *pReader);
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver);
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset);
// only for tq usage
int32_t walFetchHead(SWalReader *pRead, int64_t ver);

View File

@ -365,7 +365,7 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_WAL_LEVEL 1
#define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI

View File

@ -6771,6 +6771,15 @@ static int32_t checkDbEnumOption(STranslateContext* pCxt, const char* pName, int
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbEnumOption3(STranslateContext* pCxt, const char* pName, int32_t val, int32_t v1, int32_t v2,
int32_t v3) {
if (val >= 0 && val != v1 && val != v2 && val != v3) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option %s: %d, only %d, %d, %d allowed", pName, val, v1, v2, v3);
}
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) {
if (NULL == pRetentions) {
return TSDB_CODE_SUCCESS;
@ -6914,7 +6923,12 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
if ((pOptions->replica == 2) ^ (pOptions->withArbitrator == TSDB_MAX_DB_WITH_ARBITRATOR)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid database option, with_arbitrator should be used with replica 2");
"Invalid option, with_arbitrator should be used with replica 2");
}
if (pOptions->replica > 1 && pOptions->walLevel == 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
}
return TSDB_CODE_SUCCESS;
@ -6978,7 +6992,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
code = checkDbStrictOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_MAX_WAL_LEVEL);
code = checkDbEnumOption3(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_DEFAULT_WAL_LEVEL,
TSDB_MAX_WAL_LEVEL);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "vgroups", pOptions->numOfVgroups, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
@ -7252,6 +7267,15 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
}
static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt) {
if (pStmt->pOptions->walLevel == 0) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && dbCfg.replications > 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
}
}
int32_t code = checkDatabaseOptions(pCxt, pStmt->dbName, pStmt->pOptions);
if (TSDB_CODE_SUCCESS != code) {
return code;

View File

@ -335,7 +335,7 @@ TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
run("ALTER DATABASE test KEEP 1000000000s", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test KEEP 1w", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test PAGES 63", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
//run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 3", TSDB_CODE_PAR_INVALID_DB_OPTION);
//run("ALTER DATABASE test REPLICA 2", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test STT_TRIGGER 0", TSDB_CODE_PAR_INVALID_DB_OPTION);

View File

@ -20,7 +20,6 @@
#include "tutil.h"
#include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}
@ -154,7 +153,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// validate body
int32_t cryptedBodyLen = logContent->head.bodyLen;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
recordLen = walCkHeadSz + cryptedBodyLen;
@ -226,7 +225,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
@ -626,7 +625,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t plainBodyLen = ckHead.head.bodyLen;
int32_t cryptedBodyLen = plainBodyLen;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen;
@ -636,7 +635,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr);
goto _err;
}
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err;
@ -644,7 +643,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
count++;
}
if (taosFsyncFile(pIdxFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err;
@ -880,13 +879,13 @@ int walSaveMeta(SWal* pWal) {
int n;
// fsync the idx and log file at first to ensure validity of meta
if (taosFsyncFile(pWal->pIdxFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) {
wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosFsyncFile(pWal->pLogFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -901,7 +900,8 @@ int walSaveMeta(SWal* pWal) {
return -1;
}
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
TdFilePtr pMetaFile =
taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pMetaFile == NULL) {
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
@ -910,13 +910,13 @@ int walSaveMeta(SWal* pWal) {
char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized);
if (len != taosWriteFile(pMetaFile, serialized, len)) {
if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pMetaFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;

View File

@ -79,8 +79,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN];
if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -91,8 +90,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
}
}
if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}

View File

@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "crypt.h"
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobal.h"
#include "walInt.h"
#include "crypt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
@ -295,9 +295,9 @@ int32_t walEndSnapshot(SWal *pWal) {
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
// compatible mode for refVer
bool hasTopic = false;
bool hasTopic = false;
int64_t refVer = INT64_MAX;
void *pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
@ -396,8 +396,7 @@ int32_t walRollImpl(SWal *pWal) {
int32_t code = 0;
if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
@ -409,8 +408,7 @@ int32_t walRollImpl(SWal *pWal) {
}
if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
@ -510,12 +508,15 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index,
TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
code = walWriteIndex(pWal, index, offset);
if (code < 0) {
goto END;
if (pWal->cfg.level != TAOS_WAL_SKIP) {
code = walWriteIndex(pWal, index, offset);
if (code < 0) {
goto END;
}
}
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
if (pWal->cfg.level != TAOS_WAL_SKIP &&
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
@ -524,17 +525,17 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
}
int32_t cyptedBodyLen = plainBodyLen;
char* buf = (char*)body;
char* newBody = NULL;
char* newBodyEncrypted = NULL;
char *buf = (char *)body;
char *newBody = NULL;
char *newBodyEncrypted = NULL;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen);
newBody = taosMemoryMalloc(cyptedBodyLen);
if(newBody == NULL){
if (newBody == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
strerror(errno));
code = -1;
goto END;
}
@ -542,11 +543,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
memcpy(newBody, body, plainBodyLen);
newBodyEncrypted = taosMemoryMalloc(cyptedBodyLen);
if(newBodyEncrypted == NULL){
if (newBodyEncrypted == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
strerror(errno));
code = -1;
if(newBody != NULL) taosMemoryFreeClear(newBody);
if (newBody != NULL) taosMemoryFreeClear(newBody);
goto END;
}
@ -559,29 +560,29 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
int32_t count = CBC_Encrypt(&opts);
//wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
// pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
// wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
// pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
buf = newBodyEncrypted;
}
if (taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
code = -1;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
}
goto END;
}
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
//wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// pWal->cfg.vgId, __FUNCTION__);
taosMemoryFreeClear(newBodyEncrypted);
// wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// pWal->cfg.vgId, __FUNCTION__);
}
// set status
@ -693,6 +694,10 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
}
void walFsync(SWal *pWal, bool forceFsync) {
if (pWal->cfg.level == TAOS_WAL_SKIP) {
return;
}
taosThreadMutexLock(&pWal->mutex);
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));

View File

@ -49,7 +49,7 @@ print ============= create database with all options
# | KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day]
# | PRECISION ['ms' | 'us' | 'ns', default: ms]
# | REPLICA value [1 | 3, default: 1]
# | WAL_LEVEL value [1 | 2, default: 1]
# | WAL_LEVEL value [0 | 1 | 2, default: 1]
# | VGROUPS value [default: 2]
# | SINGLE_STABLE [0 | 1, default: ]
#
@ -404,7 +404,7 @@ endi
sql drop database db
sql_error create database db WAL_LEVEL 3
sql_error create database db WAL_LEVEL -1
sql_error create database db WAL_LEVEL 0
#sql_error create database db WAL_LEVEL 0
print ====> VGROUPS value [1~4096, default: 2]
sql create database db VGROUPS 1

View File

@ -225,7 +225,7 @@ sql_error create database $db ctime 29
sql_error create database $db ctime 40961
# wal {0, 2}
sql_error create database testwal wal_level 0
#sql_error create database testwal wal_level 0
sql select * from information_schema.ins_databases
if $rows != 2 then
return -1

View File

@ -148,7 +148,7 @@ class TDTestCase:
@property
def fsync_create_err(self):
return [
"create database db1 wal_level 0",
#"create database db1 wal_level 0",
"create database db1 wal_level 3",
"create database db1 wal_level null",
"create database db1 wal_level true",
@ -162,7 +162,7 @@ class TDTestCase:
@property
def fsync_alter_err(self):
return [
"alter database db1 wal_level 0",
#"alter database db1 wal_level 0",
"alter database db1 wal_level 3",
"alter database db1 wal_level null",
"alter database db1 wal_level true",