Merge branch 'FIX/TS-1984-3.0' of github.com:taosdata/TDengine into FIX/TS-1984-3.0
This commit is contained in:
commit
ffa6906a9f
|
@ -125,6 +125,9 @@ extern char tsSmlChildTableName[];
|
||||||
extern char tsSmlTagName[];
|
extern char tsSmlTagName[];
|
||||||
extern bool tsSmlDataFormat;
|
extern bool tsSmlDataFormat;
|
||||||
|
|
||||||
|
// wal
|
||||||
|
extern int64_t tsWalRecoverSizeLimit;
|
||||||
|
|
||||||
// internal
|
// internal
|
||||||
extern int32_t tsTransPullupInterval;
|
extern int32_t tsTransPullupInterval;
|
||||||
extern int32_t tsMqRebalanceInterval;
|
extern int32_t tsMqRebalanceInterval;
|
||||||
|
|
|
@ -43,7 +43,6 @@ extern "C" {
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||||
#define WAL_RECOV_SIZE_LIMIT (200 * WAL_SCAN_BUF_SIZE)
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
|
|
@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR
|
||||||
// udf
|
// udf
|
||||||
bool tsStartUdfd = true;
|
bool tsStartUdfd = true;
|
||||||
|
|
||||||
|
// wal
|
||||||
|
int64_t tsWalRecoverSizeLimit = (600 * 1024 * 1024L);
|
||||||
|
|
||||||
// internal
|
// internal
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
|
@ -422,6 +425,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
|
||||||
|
|
||||||
|
if (cfgAddInt64(pCfg, "walRecoverSizeLimit", tsWalRecoverSizeLimit, 3 * 1024 * 1024, INT64_MAX, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
|
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
|
||||||
GRANT_CFG_ADD;
|
GRANT_CFG_ADD;
|
||||||
|
@ -720,6 +725,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
|
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
|
||||||
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
|
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
|
||||||
|
|
||||||
|
tsWalRecoverSizeLimit = cfgGetItem(pCfg, "walRecoverSizeLimit")->i64;
|
||||||
|
|
||||||
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
|
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
|
||||||
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
|
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
|
@ -69,15 +70,23 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
||||||
uint64_t magic = WAL_MAGIC;
|
uint64_t magic = WAL_MAGIC;
|
||||||
int64_t walCkHeadSz = sizeof(SWalCkHead);
|
int64_t walCkHeadSz = sizeof(SWalCkHead);
|
||||||
int64_t end = fileSize;
|
int64_t end = fileSize;
|
||||||
int64_t offset = 0;
|
|
||||||
int64_t capacity = 0;
|
int64_t capacity = 0;
|
||||||
int64_t readSize = 0;
|
int64_t readSize = 0;
|
||||||
char* buf = NULL;
|
char* buf = NULL;
|
||||||
int64_t found = -1;
|
int64_t found = -1;
|
||||||
bool firstTrial = pFileInfo->fileSize < fileSize;
|
bool firstTrial = pFileInfo->fileSize < fileSize;
|
||||||
int64_t border = TMIN(pFileInfo->fileSize, fileSize);
|
int64_t offset = TMIN(pFileInfo->fileSize, fileSize);
|
||||||
int64_t offsetForward = border - stepSize + walCkHeadSz - 1;
|
int64_t offsetForward = offset - stepSize + walCkHeadSz - 1;
|
||||||
int64_t offsetBackward = border;
|
int64_t offsetBackward = offset;
|
||||||
|
int64_t recoverSize = end - offset;
|
||||||
|
|
||||||
|
if (tsWalRecoverSizeLimit < recoverSize) {
|
||||||
|
wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
|
||||||
|
", end:%" PRId64 ", file:%s",
|
||||||
|
pWal->cfg.vgId, tsWalRecoverSizeLimit, offset, end, fnameStr);
|
||||||
|
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
// search for the valid last WAL entry, e.g. block by block
|
// search for the valid last WAL entry, e.g. block by block
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
|
@ -252,7 +253,7 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walGetLastFileCachedSize(pWal) > WAL_RECOV_SIZE_LIMIT / 2) {
|
if (walGetLastFileCachedSize(pWal) > tsWalRecoverSizeLimit / 2) {
|
||||||
if (walSaveMeta(pWal) < 0) {
|
if (walSaveMeta(pWal) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue