Merge pull request #29417 from taosdata/enh/3.0/TD-33266-2

Enh(coverage):config&wal
This commit is contained in:
Hongze Cheng 2024-12-31 14:54:44 +08:00 committed by GitHub
commit 23d5f184c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 333 additions and 375 deletions

View File

@ -160,8 +160,6 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl
SArray *taosGetLocalCfg(SConfig *pCfg);
SArray *taosGetGlobalCfg(SConfig *pCfg);
void taosSetLocalCfg(SConfig *pCfg, SArray *pArray);
void taosSetGlobalCfg(SConfig *pCfg, SArray *pArray);
#ifdef __cplusplus
}
#endif

View File

@ -187,6 +187,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define TAOS_CHECK_RETURN_SET_CODE(CMD, CODE, ERRNO) \
do { \
int32_t __c = (CMD); \
if (__c != TSDB_CODE_SUCCESS) { \
(CODE) = (ERRNO); \
TAOS_RETURN(__c); \
} \
} while (0)
#define TAOS_CHECK_RETURN_WITH_RELEASE(CMD, PTR1, PTR2) \
do { \
int32_t __c = (CMD); \
@ -225,6 +234,16 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define TAOS_CHECK_EXIT_SET_CODE(CMD, CODE, ERRNO) \
do { \
code = (CMD); \
if (code < TSDB_CODE_SUCCESS) { \
(CODE) = (ERRNO); \
lino = __LINE__; \
goto _exit; \
} \
} while (0)
#define TAOS_UNUSED(expr) (void)(expr)
bool taosIsBigChar(char c);

View File

@ -14,12 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "cJSON.h"
#include "defines.h"
#include "os.h"
#include "osString.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tgrant.h"
#include "tjson.h"
#include "tlog.h"
@ -825,7 +825,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
// clang-format off
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER, CFG_CATEGORY_LOCAL));
@ -858,9 +858,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
@ -911,7 +911,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlBatchDropNum", tsTtlBatchDropNum, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
@ -919,11 +919,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "s3MigrateEnabled", tsS3MigrateEnabled, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "udf", tsStartUdfd, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
@ -939,11 +939,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
SConfigItem *pItem = NULL;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "fPrecision");
tsFPrecision = pItem->fval;
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "maxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "curRange", tsCurRange, 0, 65536, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "ifAdtFse", tsIfAdtFse, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
@ -1932,7 +1929,7 @@ int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char *
SConfig *pCfg = NULL;
TAOS_CHECK_RETURN(cfgInit(&pCfg));
TAOS_CHECK_GOTO(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_LOCAL), &lino,
TAOS_CHECK_GOTO(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER, CFG_CATEGORY_LOCAL), &lino,
_exit);
TAOS_CHECK_GOTO(
cfgAddInt32(pCfg, "debugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER, CFG_CATEGORY_LOCAL), &lino,

View File

@ -156,9 +156,11 @@ static inline void walResetVer(SWalVer* pVer) {
int32_t walLoadMeta(SWal* pWal);
int32_t walSaveMeta(SWal* pWal);
int32_t walRemoveMeta(SWal* pWal);
int32_t walRollImpl(SWal* pWal);
int32_t walRollFileInfo(SWal* pWal);
int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer);
int32_t walCheckAndRepairMeta(SWal* pWal);
int64_t walChangeWrite(SWal* pWal, int64_t ver);
int32_t walCheckAndRepairIdx(SWal* pWal);

View File

@ -57,18 +57,10 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno;
goto _err;
}
TAOS_CHECK_GOTO(taosStatFile(fnameStr, &fileSize, NULL, NULL), &lino, _err);
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
*lastVer = retVer;
TAOS_RETURN(terrno);
}
TSDB_CHECK_NULL(pFile, code, lino, _err, terrno);
// ensure size as non-negative
pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
@ -102,9 +94,7 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
capacity = readSize + sizeof(magic);
ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _err);
}
TSDB_CHECK_NULL(ptr, code, lino, _err, terrno);
buf = ptr;
int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
@ -166,9 +156,7 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
if (capacity < readSize + extraSize + sizeof(magic)) {
capacity += extraSize;
void* ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _err);
}
TSDB_CHECK_NULL(ptr, code, lino, _err, terrno);
buf = ptr;
}
int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET);
@ -187,10 +175,7 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
}
logContent = (SWalCkHead*)(buf + pos);
code = decryptBody(&pWal->cfg, logContent, logContent->head.bodyLen, __FUNCTION__);
if (code) {
break;
}
TAOS_CHECK_GOTO(decryptBody(&pWal->cfg, logContent, logContent->head.bodyLen, __FUNCTION__), &lino, _err);
if (walValidBodyCksum(logContent) != 0) {
code = TSDB_CODE_WAL_CHKSUM_MISMATCH;
@ -233,13 +218,16 @@ FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t*
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _err);
TAOS_CHECK_GOTO(terrno, &lino, _err);
}
}
pFileInfo->fileSize = lastEntryEndOffset;
_err:
if (code != 0) {
wError("vgId:%d, failed to scan log file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
}
taosCloseFile(&pFile);
taosMemoryFree(buf);
*lastVer = retVer;
@ -371,45 +359,37 @@ static int32_t walLogEntriesComplete(const SWal* pWal) {
}
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t code = TSDB_CODE_SUCCESS;
TdFilePtr pFile = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
TdFilePtr pFile = NULL;
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
if (!pFileInfo) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
TSDB_CHECK_NULL(pFileInfo, code, lino, _exit, terrno);
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno;
goto _exit;
}
TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0);
int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
if (fileSize <= lastEndOffset) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
if (fileSize <= lastEndOffset) TAOS_RETURN(TSDB_CODE_SUCCESS);
pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
code = terrno;
goto _exit;
}
TSDB_CHECK_NULL(pFile, code, lino, _exit, terrno);
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
lastEndOffset);
code = taosFtruncateFile(pFile, lastEndOffset);
if (code < 0) {
wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
goto _exit;
}
TAOS_CHECK_EXIT(taosFtruncateFile(pFile, lastEndOffset));
_exit:
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, failed to trim idx file %s due to %s", pWal->cfg.vgId, fnameStr, tstrerror(code));
}
(void)taosCloseFile(&pFile);
TAOS_RETURN(code);
}
@ -425,35 +405,34 @@ static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {
}
}
void walRegfree(regex_t* ptr) {
if (ptr == NULL) {
return;
}
regfree(ptr);
}
int32_t walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info
int32_t code = 0;
int32_t lino = 0;
const char* logPattern = "^[0-9]+.log$";
const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern;
regex_t idxRegPattern;
regex_t logRegPattern, idxRegPattern;
TdDirPtr pDir = NULL;
SArray* actualLog = NULL;
wInfo("vgId:%d, begin to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
if (regcomp(&logRegPattern, logPattern, REG_EXTENDED) != 0) {
wError("failed to compile log pattern, error:%s", tstrerror(terrno));
return terrno;
}
if (regcomp(&idxRegPattern, idxPattern, REG_EXTENDED) != 0) {
wError("failed to compile idx pattern");
return terrno;
}
TAOS_CHECK_EXIT_SET_CODE(regcomp(&logRegPattern, logPattern, REG_EXTENDED), code, terrno);
TdDirPtr pDir = taosOpenDir(pWal->path);
if (pDir == NULL) {
regfree(&logRegPattern);
regfree(&idxRegPattern);
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return terrno;
}
TAOS_CHECK_EXIT_SET_CODE(regcomp(&idxRegPattern, idxPattern, REG_EXTENDED), code, terrno);
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
pDir = taosOpenDir(pWal->path);
TSDB_CHECK_NULL(pDir, code, lino, _exit, terrno);
actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
// scan log files and build new meta
TdDirEntryPtr pDirEntry;
@ -464,28 +443,10 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo;
(void)memset(&fileInfo, -1, sizeof(SWalFileInfo));
(void)sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
if (!taosArrayPush(actualLog, &fileInfo)) {
regfree(&logRegPattern);
regfree(&idxRegPattern);
int32_t ret = taosCloseDir(&pDir);
if (ret != 0) {
wError("failed to close dir, ret:%s", tstrerror(ret));
return terrno;
}
return terrno;
}
TSDB_CHECK_NULL(taosArrayPush(actualLog, &fileInfo), code, lino, _exit, terrno);
}
}
int32_t ret = taosCloseDir(&pDir);
if (ret != 0) {
wError("failed to close dir, ret:%s", tstrerror(ret));
return terrno;
}
regfree(&logRegPattern);
regfree(&idxRegPattern);
taosArraySort(actualLog, compareWalFileInfo);
wInfo("vgId:%d, actual log file, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
@ -500,11 +461,7 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
bool updateMeta = (metaFileNum != actualFileNum);
// rebuild meta of file info
code = walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
taosArrayDestroy(actualLog);
if (code) {
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(walRebuildFileInfoSet(pWal->fileInfoSet, actualLog));
wInfo("vgId:%d, log file in meta, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(pWal->fileInfoSet));
@ -521,12 +478,7 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int32_t code = taosStatFile(fnameStr, &fileSize, NULL, NULL);
if (code < 0) {
wError("failed to stat file since %s. file:%s", terrstr(), fnameStr);
TAOS_RETURN(terrno);
}
TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL));
if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
totSize += pFileInfo->fileSize;
@ -581,22 +533,24 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
wInfo("vgId:%d, success to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
_exit:
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, failed to repair meta due to %s, at line:%d", pWal->cfg.vgId, tstrerror(code), lino);
}
taosArrayDestroy(actualLog);
TAOS_UNUSED(taosCloseDir(&pDir));
walRegfree(&logRegPattern);
walRegfree(&idxRegPattern);
return code;
}
static int32_t walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
TAOS_RETURN(terrno);
}
if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) return terrno;
if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
TAOS_RETURN(terrno);
}
if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) return terrno;
if (walValidHeadCksum(pCkHead) != 0) {
TAOS_RETURN(TSDB_CODE_WAL_CHKSUM_MISMATCH);
}
if (walValidHeadCksum(pCkHead) != 0) return TSDB_CODE_WAL_CHKSUM_MISMATCH;
return TSDB_CODE_SUCCESS;
}
@ -838,29 +792,17 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) {
wInfo("vgId:%d, failed to add meta to root", pWal->cfg.vgId);
}
if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) goto _err;
snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.firstVer);
if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) {
wInfo("vgId:%d, failed to add firstVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.snapshotVer);
if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) {
wInfo("vgId:%d, failed to add snapshotVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.commitVer);
if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) {
wInfo("vgId:%d, failed to add commitVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.lastVer);
if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) {
wInfo("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) goto _err;
if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) {
wInfo("vgId:%d, failed to add files to root", pWal->cfg.vgId);
}
if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) goto _err;
SWalFileInfo* pData = pWal->fileInfoSet->pData;
for (int i = 0; i < sz; i++) {
SWalFileInfo* pInfo = &pData[i];
@ -869,31 +811,20 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
}
if (pField == NULL) {
cJSON_Delete(pRoot);
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
// cjson only support int32_t or double
// string are used to prohibit the loss of precision
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->firstVer);
if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) {
wInfo("vgId:%d, failed to add firstVer to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->lastVer);
if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) {
wInfo("vgId:%d, failed to add lastVer to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->createTs);
if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) {
wInfo("vgId:%d, failed to add createTs to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->closeTs);
if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) {
wInfo("vgId:%d, failed to add closeTs to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) goto _err;
(void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->fileSize);
if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) {
wInfo("vgId:%d, failed to add fileSize to field", pWal->cfg.vgId);
}
if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) goto _err;
}
char* pSerialized = cJSON_Print(pRoot);
cJSON_Delete(pRoot);
@ -901,6 +832,9 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
*serialized = pSerialized;
TAOS_RETURN(TSDB_CODE_SUCCESS);
_err:
cJSON_Delete(pRoot);
return TSDB_CODE_FAILED;
}
int32_t walMetaDeserialize(SWal* pWal, const char* bytes) {
@ -1109,8 +1043,12 @@ _err:
}
int32_t walLoadMeta(SWal* pWal) {
int32_t code = 0;
int n = 0;
int32_t code = 0;
int n = 0;
int32_t lino = 0;
char* buf = NULL;
TdFilePtr pFile = NULL;
// find existing meta file
int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) {
@ -1125,11 +1063,7 @@ int32_t walLoadMeta(SWal* pWal) {
}
// read metafile
int64_t fileSize = 0;
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno;
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0);
if (fileSize == 0) {
code = taosRemoveFile(fnameStr);
if (code) {
@ -1141,37 +1075,37 @@ int32_t walLoadMeta(SWal* pWal) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
int size = (int)fileSize;
char* buf = taosMemoryMalloc(size + 5);
if (buf == NULL) {
TAOS_RETURN(terrno);
}
int size = (int)fileSize;
buf = taosMemoryMalloc(size + 5);
TSDB_CHECK_NULL(buf, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
(void)memset(buf, 0, size + 5);
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pFile == NULL) {
taosMemoryFree(buf);
pFile = taosOpenFile(fnameStr, TD_FILE_READ);
TSDB_CHECK_NULL(pFile, code, lino, _exit, terrno);
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
}
if (taosReadFile(pFile, buf, size) != size) {
(void)taosCloseFile(&pFile);
taosMemoryFree(buf);
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
// load into fileInfoSet
code = walMetaDeserialize(pWal, buf);
if (code < 0) {
wError("failed to deserialize wal meta. file:%s", fnameStr);
code = TSDB_CODE_WAL_FILE_CORRUPTED;
}
(void)taosCloseFile(&pFile);
taosMemoryFree(buf);
wInfo("vgId:%d, meta file loaded: %s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileInfoSet size:%d", pWal->cfg.vgId,
fnameStr, pWal->vers.firstVer, pWal->vers.lastVer, (int32_t)taosArrayGetSize(pWal->fileInfoSet));
printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file in meta");
_exit:
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, failed to load meta file due to %s, at line:%d", pWal->cfg.vgId, tstrerror(code), lino);
}
taosMemoryFree(buf);
(void)taosCloseFile(&pFile);
TAOS_RETURN(code);
}

View File

@ -92,18 +92,15 @@ void walApplyVer(SWal *pWal, int64_t ver) {
}
int32_t walCommit(SWal *pWal, int64_t ver) {
if (ver < pWal->vers.commitVer) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
if (ver > pWal->vers.lastVer || pWal->vers.commitVer < pWal->vers.snapshotVer) {
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
if (ver < pWal->vers.commitVer) TAOS_RETURN(TSDB_CODE_SUCCESS);
if (ver > pWal->vers.lastVer || pWal->vers.commitVer < pWal->vers.snapshotVer) TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
pWal->vers.commitVer = ver;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static int64_t walChangeWrite(SWal *pWal, int64_t ver) {
int64_t walChangeWrite(SWal *pWal, int64_t ver) {
int code;
TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN];
@ -161,6 +158,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int32_t code = 0;
int32_t lino = 0;
int64_t ret;
char fnameStr[WAL_FILE_LEN];
TdFilePtr pIdxFile = NULL, pLogFile = NULL;
@ -172,11 +170,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// find correct file
if (ver < walGetLastFileFirstVer(pWal)) {
// change current files
ret = walChangeWrite(pWal, ver);
if (ret < 0) {
code = terrno;
goto _exit;
}
TAOS_CHECK_EXIT_SET_CODE(walChangeWrite(pWal, ver), code, terrno);
// delete files in descending order
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
@ -198,10 +192,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxFile == NULL) {
code = terrno;
goto _exit;
}
TSDB_CHECK_NULL(pIdxFile, code, lino, _exit, terrno);
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (ret < 0) {
@ -218,11 +209,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
// TODO
code = terrno;
goto _exit;
}
TSDB_CHECK_NULL(pLogFile, code, lino, _exit, terrno);
ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (ret < 0) {
// TODO
@ -238,35 +226,26 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
code = walValidHeadCksum(&head);
if (code != 0) {
code = TSDB_CODE_WAL_FILE_CORRUPTED;
goto _exit;
}
if (head.head.version != ver) {
if (code != 0 || head.head.version != ver) {
code = TSDB_CODE_WAL_FILE_CORRUPTED;
goto _exit;
}
// truncate old files
code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) {
goto _exit;
}
code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) {
goto _exit;
}
if ((code = taosFtruncateFile(pLogFile, entry.offset)) < 0) goto _exit;
if ((code = taosFtruncateFile(pIdxFile, idxOff)) < 0) goto _exit;
pWal->vers.lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
code = walSaveMeta(pWal);
if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
goto _exit;
}
TAOS_CHECK_EXIT(walSaveMeta(pWal));
_exit:
if (code != 0) {
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
}
TAOS_UNUSED(taosCloseFile(&pIdxFile));
TAOS_UNUSED(taosCloseFile(&pLogFile));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
@ -274,7 +253,7 @@ _exit:
TAOS_RETURN(code);
}
static int32_t walRollImpl(SWal *pWal) {
int32_t walRollImpl(SWal *pWal) {
int32_t code = 0, lino = 0;
if (pWal->cfg.level == TAOS_WAL_SKIP && pWal->pIdxFile != NULL && pWal->pLogFile != NULL) {
@ -306,15 +285,12 @@ static int32_t walRollImpl(SWal *pWal) {
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVer, fnameStr);
pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxFile == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
TSDB_CHECK_NULL(pIdxFile, code, lino, _exit, terrno);
walBuildLogName(pWal, newFileFirstVer, fnameStr);
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
TSDB_CHECK_NULL(pLogFile, code, lino, _exit, terrno);
TAOS_CHECK_GOTO(walRollFileInfo(pWal), &lino, _exit);
@ -358,6 +334,7 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
int32_t code = 0;
int32_t lino = 0;
if (pWal->cfg.level == TAOS_WAL_SKIP) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
@ -375,16 +352,13 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling
if (walGetLastFileSize(pWal) != 0) {
if ((code = walRollImpl(pWal)) < 0) {
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
goto _exit;
}
}
if (walGetLastFileSize(pWal) != 0 && (code = walRollImpl(pWal)) < 0) goto _exit;
_exit:
if (code) {
wError("vgId:%d, %s failed since %s at line %d", pWal->cfg.vgId, __func__, tstrerror(code), lino);
}
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
@ -515,6 +489,13 @@ _exit:
return code;
}
static void walStopDnode(SWal *pWal) {
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
}
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int32_t code = 0;
@ -528,12 +509,7 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver);
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
walStopDnode(pWal);
TAOS_RETURN(terrno);
}
@ -579,12 +555,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
code = terrno;
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
walStopDnode(pWal);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
@ -597,12 +568,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen);
newBody = taosMemoryMalloc(cyptedBodyLen);
if (newBody == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
TSDB_CHECK_NULL(newBody, code, lino, _exit, terrno);
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
(void)memset(newBody, 0, cyptedBodyLen);
(void)memcpy(newBody, body, plainBodyLen);
@ -641,10 +608,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
taosMemoryFreeClear(newBodyEncrypted);
}
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
walStopDnode(pWal);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
@ -652,8 +616,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
// wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// pWal->cfg.vgId, __FUNCTION__);
}
// set status
@ -668,6 +630,10 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
return 0;
_exit:
if (code) {
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
}
// recover in a reverse order
if (taosFtruncateFile(pWal->pLogFile, offset) < 0) {
wFatal("vgId:%d, failed to recover WAL logfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId,

View File

@ -627,6 +627,68 @@ TEST_F(WalKeepEnv, walRollback) {
ASSERT_EQ(code, 0);
}
TEST_F(WalKeepEnv, walChangeWrite) {
walResetEnv();
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walChangeWrite(pWal, 50);
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanEnv, walRepairLogFileTs2) {
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollImpl(pWal);
ASSERT_EQ(code, 0);
for (i = 100; i < 200; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollImpl(pWal);
ASSERT_EQ(code, 0);
for (i = 200; i < 300; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollImpl(pWal);
ASSERT_EQ(code, 0);
// Try to step in ts repair logic.
SWalFileInfo* pFileInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 2);
pFileInfo->closeTs = -1;
code = walCheckAndRepairMeta(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv();
int code;

View File

@ -46,24 +46,29 @@ int32_t cfgSetItemVal(SConfigItem *pItem, const char *name, const char *value, E
extern char **environ;
int32_t cfgInit(SConfig **ppCfg) {
SConfig *pCfg = taosMemoryCalloc(1, sizeof(SConfig));
if (pCfg == NULL) {
TAOS_RETURN(terrno);
}
int32_t code = 0;
int32_t lino = 0;
SConfig *pCfg = NULL;
pCfg = taosMemoryCalloc(1, sizeof(SConfig));
if (pCfg == NULL) return terrno;
pCfg->localArray = NULL, pCfg->globalArray = NULL;
pCfg->localArray = taosArrayInit(64, sizeof(SConfigItem));
if (pCfg->localArray == NULL) {
taosMemoryFree(pCfg);
TAOS_RETURN(terrno);
}
TSDB_CHECK_NULL(pCfg->localArray, code, lino, _exit, terrno);
pCfg->globalArray = taosArrayInit(64, sizeof(SConfigItem));
if (pCfg->globalArray == NULL) {
taosMemoryFree(pCfg);
TAOS_RETURN(terrno);
}
TSDB_CHECK_NULL(pCfg->globalArray, code, lino, _exit, terrno);
TAOS_CHECK_RETURN(taosThreadMutexInit(&pCfg->lock, NULL));
*ppCfg = pCfg;
_exit:
if (code != 0) {
uError("failed to init config, since %s ,at line %d", tstrerror(code), lino);
cfgCleanup(pCfg);
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -187,14 +192,11 @@ int32_t cfgGetGlobalSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->globalAr
static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) {
cfgItemFreeVal(pItem);
if (!(pItem->str == NULL)) {
return TSDB_CODE_INVALID_PARA;
}
if (!(pItem->str == NULL)) return TSDB_CODE_INVALID_PARA;
pItem->str = taosStrdup(conf);
if (pItem->str == NULL) {
TAOS_RETURN(terrno);
}
if (pItem->str == NULL) return terrno;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -209,9 +211,8 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) {
taosMemoryFreeClear(pItem->str);
pItem->str = taosStrdup(fullDir);
if (pItem->str == NULL) {
TAOS_RETURN(terrno);
}
if (pItem->str == NULL) return terrno;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -219,9 +220,8 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) {
static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int32_t code = 0;
bool tmp = false;
if (strcasecmp(value, "true") == 0) {
tmp = true;
}
if (strcasecmp(value, "true") == 0) tmp = true;
int32_t val = 0;
if ((code = taosStr2int32(value, &val)) == 0 && val > 0) {
tmp = true;
@ -441,9 +441,7 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese
// logflag names that should 'not' be set by 'debugFlag'
if (pDebugFlagItem->array == NULL) {
pDebugFlagItem->array = taosArrayInit(16, sizeof(SLogVar));
if (pDebugFlagItem->array == NULL) {
TAOS_RETURN(terrno);
}
if (pDebugFlagItem->array == NULL) return terrno;
}
taosArrayClear(pDebugFlagItem->array);
TAOS_RETURN(TSDB_CODE_SUCCESS);
@ -454,9 +452,7 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese
if (pDebugFlagItem->array != NULL) {
SLogVar logVar = {0};
tstrncpy(logVar.name, name, TSDB_LOG_VAR_LEN);
if (NULL == taosArrayPush(pDebugFlagItem->array, &logVar)) {
TAOS_RETURN(terrno);
}
if (NULL == taosArrayPush(pDebugFlagItem->array, &logVar)) return terrno;
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -518,9 +514,8 @@ _exit:
int32_t cfgSetItemVal(SConfigItem *pItem, const char *name, const char *value, ECfgSrcType stype) {
int32_t code = TSDB_CODE_SUCCESS;
if (pItem == NULL) {
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND);
}
if (pItem == NULL) return TSDB_CODE_CFG_NOT_FOUND;
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
code = cfgSetBool(pItem, value, stype);
@ -589,10 +584,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) {
}
void cfgLock(SConfig *pCfg) {
if (pCfg == NULL) {
return;
}
if (pCfg == NULL) return;
(void)taosThreadMutexLock(&pCfg->lock);
}
@ -600,7 +592,7 @@ void cfgUnLock(SConfig *pCfg) { (void)taosThreadMutexUnlock(&pCfg->lock); }
int32_t checkItemDyn(SConfigItem *pItem, bool isServer) {
if (pItem->dynScope == CFG_DYN_NONE) {
return TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_CFG;
}
if (isServer) {
if (pItem->dynScope == CFG_DYN_CLIENT || pItem->dynScope == CFG_DYN_CLIENT_LAZY) {
@ -617,39 +609,33 @@ int32_t checkItemDyn(SConfigItem *pItem, bool isServer) {
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer,
CfgAlterType alterType) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cfgLock(pCfg);
SConfigItem *pItem = cfgGetItem(pCfg, name);
if (pItem == NULL) {
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND);
}
int32_t code = checkItemDyn(pItem, isServer);
if (code != TSDB_CODE_SUCCESS) {
cfgUnLock(pCfg);
TAOS_RETURN(code);
}
TSDB_CHECK_NULL(pItem, code, lino, _exit, TSDB_CODE_CFG_NOT_FOUND);
TAOS_CHECK_EXIT(checkItemDyn(pItem, isServer));
if ((pItem->category == CFG_CATEGORY_GLOBAL) && alterType == CFG_ALTER_DNODE) {
uError("failed to config:%s, not support update global config on only one dnode", name);
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_INVALID_CFG);
code = TSDB_CODE_INVALID_CFG;
goto _exit;
}
switch (pItem->dtype) {
case CFG_DTYPE_STRING: {
if (strcasecmp(name, "slowLogScope") == 0) {
char *tmp = taosStrdup(pVal);
if (!tmp) {
cfgUnLock(pCfg);
uError("failed to config:%s since %s", name, terrstr());
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
int32_t scope = 0;
int32_t code = taosSetSlowLogScope(tmp, &scope);
code = taosSetSlowLogScope(tmp, &scope);
if (TSDB_CODE_SUCCESS != code) {
cfgUnLock(pCfg);
taosMemoryFree(tmp);
TAOS_RETURN(code);
goto _exit;
}
taosMemoryFree(tmp);
}
@ -659,13 +645,13 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
code = taosStr2int32(pVal, &ival);
if (code != 0 || (ival != 0 && ival != 1)) {
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival);
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
code = TSDB_CODE_OUT_OF_RANGE;
goto _exit;
}
} break;
case CFG_DTYPE_INT32: {
int32_t ival;
int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival);
code = (int32_t)taosStrHumanToInt32(pVal, &ival);
if (code != TSDB_CODE_SUCCESS) {
cfgUnLock(pCfg);
return code;
@ -673,13 +659,13 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
code = TSDB_CODE_OUT_OF_RANGE;
goto _exit;
}
} break;
case CFG_DTYPE_INT64: {
int64_t ival;
int32_t code = taosStrHumanToInt64(pVal, &ival);
code = taosStrHumanToInt64(pVal, &ival);
if (code != TSDB_CODE_SUCCESS) {
cfgUnLock(pCfg);
TAOS_RETURN(code);
@ -687,31 +673,32 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
code = TSDB_CODE_OUT_OF_RANGE;
goto _exit;
}
} break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
float dval = 0;
int32_t code = parseCfgReal(pVal, &dval);
if (code != TSDB_CODE_SUCCESS) {
cfgUnLock(pCfg);
TAOS_RETURN(code);
}
float dval = 0;
TAOS_CHECK_EXIT(parseCfgReal(pVal, &dval));
if (dval < pItem->fmin || dval > pItem->fmax) {
uError("cfg:%s, type:%s value:%g out of range[%g, %g]", pItem->name, cfgDtypeStr(pItem->dtype), dval,
pItem->fmin, pItem->fmax);
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
code = TSDB_CODE_OUT_OF_RANGE;
goto _exit;
}
} break;
default:
break;
}
_exit:
if (code != TSDB_CODE_SUCCESS) {
uError("failed to check range for cfg:%s, value:%s, since %s at line:%d", name, pVal, tstrerror(code), __LINE__);
}
cfgUnLock(pCfg);
TAOS_RETURN(TSDB_CODE_SUCCESS);
TAOS_RETURN(code);
}
static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
@ -720,9 +707,7 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
pItem->stype = CFG_STYPE_DEFAULT;
pItem->name = taosStrdup(name);
if (pItem->name == NULL) {
TAOS_RETURN(terrno);
}
if (pItem->name == NULL) return terrno;
int32_t size = taosArrayGetSize(array);
for (int32_t i = 0; i < size; ++i) {
@ -819,9 +804,8 @@ int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, in
int8_t category) {
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope, .dynScope = dynScope, .category = category};
item.str = taosStrdup(defaultVal);
if (item.str == NULL) {
TAOS_RETURN(terrno);
}
if (item.str == NULL) return terrno;
return cfgAddItem(pCfg, &item, name);
}
@ -943,13 +927,9 @@ int32_t cfgDumpItemValue(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t
break;
}
if (len < 0) {
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
if (len < 0) return terrno;
if (len > bufSize) {
len = bufSize;
}
if (len > bufSize) len = bufSize;
*pLen = len;
TAOS_RETURN(TSDB_CODE_SUCCESS);
@ -1310,9 +1290,7 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
}
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
TAOS_RETURN(terrno);
}
if (pFile == NULL) return terrno;
while (!taosEOFFile(pFile)) {
name = value = value2 = value3 = value4 = NULL;
@ -1466,8 +1444,10 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
}
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
char *cfgLineBuf = NULL, *name, *value, *value2, *value3, *value4;
SJson *pJson = NULL;
char *cfgLineBuf = NULL, *buf = NULL, *name, *value, *value2, *value3, *value4;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
int32_t olen, vlen, vlen2, vlen3, vlen4;
int32_t code = 0, lino = 0;
if (url == NULL || strlen(url) == 0) {
@ -1490,36 +1470,28 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
}
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ);
if (pFile == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TSDB_CHECK_NULL(pFile, code, lino, _exit, terrno);
size_t fileSize = taosLSeekFile(pFile, 0, SEEK_END);
if (fileSize <= 0) {
(void)taosCloseFile(&pFile);
(void)printf("load json file error: %s\n", filepath);
TAOS_CHECK_EXIT(terrno);
}
char *buf = taosMemoryMalloc(fileSize + 1);
if (!buf) {
(void)taosCloseFile(&pFile);
(void)printf("load json file error: %s, failed to alloc memory\n", filepath);
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
buf = taosMemoryMalloc(fileSize + 1);
TSDB_CHECK_NULL(buf, code, lino, _exit, terrno);
buf[fileSize] = 0;
if (taosLSeekFile(pFile, 0, SEEK_SET) < 0) {
(void)taosCloseFile(&pFile);
(void)printf("load json file error: %s\n", filepath);
taosMemoryFreeClear(buf);
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
if (taosReadFile(pFile, buf, fileSize) <= 0) {
(void)taosCloseFile(&pFile);
(void)printf("load json file error: %s\n", filepath);
taosMemoryFreeClear(buf);
TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
code = TSDB_CODE_INVALID_DATA_FMT;
goto _exit;
}
(void)taosCloseFile(&pFile);
pJson = tjsonParse(buf);
if (NULL == pJson) {
const char *jsonParseError = tjsonGetError();
@ -1529,7 +1501,6 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
taosMemoryFreeClear(buf);
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_DATA_FMT);
}
taosMemoryFreeClear(buf);
int32_t jsonArraySize = tjsonGetArraySize(pJson);
for (int32_t i = 0; i < jsonArraySize; i++) {
@ -1596,17 +1567,20 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
TAOS_RETURN(TSDB_CODE_INVALID_PARA);
}
taosMemoryFree(cfgLineBuf);
uInfo("load from apoll url not implemented yet");
TAOS_RETURN(TSDB_CODE_SUCCESS);
_exit:
taosMemoryFree(cfgLineBuf);
taosMemoryFree(buf);
(void)taosCloseFile(&pFile);
tjsonDelete(pJson);
if (code != 0) {
(void)printf("failed to load from apollo url:%s at line %d since %s\n", url, lino, tstrerror(code));
if (code == TSDB_CODE_CFG_NOT_FOUND) {
uTrace("load from apoll url success");
TAOS_RETURN(TSDB_CODE_SUCCESS);
} else {
(void)printf("failed to load from apoll url:%s at line %d since %s\n", url, lino, tstrerror(code));
TAOS_RETURN(code);
}
TAOS_RETURN(code);
}
int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl) {
@ -1701,9 +1675,7 @@ struct SConfigIter {
int32_t cfgCreateIter(SConfig *pConf, SConfigIter **ppIter) {
SConfigIter *pIter = taosMemoryCalloc(1, sizeof(SConfigIter));
if (pIter == NULL) {
TAOS_RETURN(terrno);
}
if (pIter == NULL) return terrno;
pIter->pConf = pConf;
@ -1721,14 +1693,10 @@ SConfigItem *cfgNextIter(SConfigIter *pIter) {
}
void cfgDestroyIter(SConfigIter *pIter) {
if (pIter == NULL) {
return;
}
if (pIter == NULL) return;
taosMemoryFree(pIter);
}
SArray *taosGetLocalCfg(SConfig *pCfg) { return pCfg->localArray; }
SArray *taosGetGlobalCfg(SConfig *pCfg) { return pCfg->globalArray; }
void taosSetLocalCfg(SConfig *pCfg, SArray *pArray) { pCfg->localArray = pArray; };
void taosSetGlobalCfg(SConfig *pCfg, SArray *pArray) { pCfg->globalArray = pArray; };
SArray *taosGetGlobalCfg(SConfig *pCfg) { return pCfg->globalArray; }

View File

@ -337,6 +337,12 @@ TEST_F(CfgTest, cfgLoadFromEnvFile) {
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 0, 0), 0);
TdFilePtr envFile = NULL;
const char *envFilePath = TD_TMP_DIR_PATH "envFile";
envFile = taosOpenFile(envFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
@ -355,6 +361,12 @@ TEST_F(CfgTest, cfgLoadFromApollUrl) {
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 0, 0), 0);
TdFilePtr jsonFile = NULL;
const char *jsonFilePath = TD_TMP_DIR_PATH "envJson.json";
jsonFile = taosOpenFile(jsonFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);