feat(wal): remove wal log size limitation
This commit is contained in:
parent
a3133b9f6d
commit
bf37f3fa68
|
@ -33,16 +33,16 @@ extern "C" {
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define WAL_PROTO_VER 0
|
#define WAL_PROTO_VER 0
|
||||||
#define WAL_NOSUFFIX_LEN 20
|
#define WAL_NOSUFFIX_LEN 20
|
||||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||||
#define WAL_LOG_SUFFIX "log"
|
#define WAL_LOG_SUFFIX "log"
|
||||||
#define WAL_INDEX_SUFFIX "idx"
|
#define WAL_INDEX_SUFFIX "idx"
|
||||||
#define WAL_REFRESH_MS 1000
|
#define WAL_REFRESH_MS 1000
|
||||||
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead))
|
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_MAGIC 0xFAFBFCFDULL
|
||||||
#define WAL_MAGIC 0xFAFBFCFDULL
|
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_WRITE = 1,
|
TAOS_WAL_WRITE = 1,
|
||||||
|
|
|
@ -421,7 +421,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
|
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
|
||||||
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
|
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
|
||||||
|
|
||||||
#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3)
|
#define TSDB_MAX_MSG_SIZE (1024 * 1024 * 10)
|
||||||
|
|
||||||
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
|
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,6 @@ void taosIp2String(uint32_t ip, char *str);
|
||||||
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
|
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
|
||||||
|
|
||||||
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
|
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
|
||||||
char *strDupUnquo(const char *src);
|
|
||||||
|
|
||||||
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
||||||
T_MD5_CTX context;
|
T_MD5_CTX context;
|
||||||
|
|
|
@ -40,11 +40,11 @@ bool tsPrintAuth = false;
|
||||||
|
|
||||||
// multi process
|
// multi process
|
||||||
int32_t tsMultiProcess = 0;
|
int32_t tsMultiProcess = 0;
|
||||||
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024;
|
int32_t tsMnodeShmSize = TSDB_MAX_MSG_SIZE * 2 + 1024;
|
||||||
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024;
|
int32_t tsVnodeShmSize = TSDB_MAX_MSG_SIZE * 10 + 1024;
|
||||||
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
|
int32_t tsQnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
|
||||||
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
|
int32_t tsSnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
|
||||||
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
|
int32_t tsBnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024;
|
||||||
int32_t tsNumOfShmThreads = 1;
|
int32_t tsNumOfShmThreads = 1;
|
||||||
|
|
||||||
// queue & threads
|
// queue & threads
|
||||||
|
@ -387,11 +387,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||||
|
@ -447,8 +447,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||||
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L);
|
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
|
||||||
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, INT64_MAX, 0) != 0)
|
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||||
|
|
|
@ -5342,6 +5342,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
|
||||||
|
|
||||||
if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pReq->action) < 0) return -1;
|
if (tEncodeI8(pEncoder, pReq->action) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->colId) < 0) return -1;
|
||||||
switch (pReq->action) {
|
switch (pReq->action) {
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||||
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
|
||||||
|
@ -5392,6 +5393,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
|
||||||
|
|
||||||
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
|
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1;
|
||||||
switch (pReq->action) {
|
switch (pReq->action) {
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||||
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
|
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
|
||||||
|
|
|
@ -231,7 +231,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
||||||
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
mDebug("start to read sdb file:%s", file);
|
mDebug("start to read sdb file:%s", file);
|
||||||
|
|
||||||
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
|
SSdbRaw *pRaw = taosMemoryMalloc(TSDB_MAX_MSG_SIZE + 100);
|
||||||
if (pRaw == NULL) {
|
if (pRaw == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed read sdb file since %s", terrstr());
|
mError("failed read sdb file since %s", terrstr());
|
||||||
|
@ -556,8 +556,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
|
||||||
if (term != NULL) *term = commitTerm;
|
if (term != NULL) *term = commitTerm;
|
||||||
if (config != NULL) *config = commitConfig;
|
if (config != NULL) *config = commitConfig;
|
||||||
|
|
||||||
mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
|
||||||
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
|
" file:%s",
|
||||||
|
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,6 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
ASSERT(pWal->fileInfoSet != NULL);
|
|
||||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
ASSERT(sz > 0);
|
ASSERT(sz > 0);
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -55,7 +54,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
|
|
||||||
int64_t fileSize = 0;
|
int64_t fileSize = 0;
|
||||||
taosStatFile(fnameStr, &fileSize, NULL);
|
taosStatFile(fnameStr, &fileSize, NULL);
|
||||||
int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize);
|
int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize);
|
||||||
pLastFileInfo->fileSize = fileSize;
|
pLastFileInfo->fileSize = fileSize;
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||||
|
@ -73,7 +72,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosLSeekFile(pFile, -readSize, SEEK_END);
|
int64_t offset;
|
||||||
|
offset = taosLSeekFile(pFile, -readSize, SEEK_END);
|
||||||
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
@ -81,29 +81,53 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* haystack = buf;
|
|
||||||
char* found = NULL;
|
char* found = NULL;
|
||||||
char* candidate;
|
while (1) {
|
||||||
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
|
char* haystack = buf;
|
||||||
// read and validate
|
char* candidate;
|
||||||
SWalCkHead* logContent = (SWalCkHead*)candidate;
|
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
|
||||||
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
|
// read and validate
|
||||||
found = candidate;
|
SWalCkHead* logContent = (SWalCkHead*)candidate;
|
||||||
|
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
|
||||||
|
found = candidate;
|
||||||
|
}
|
||||||
|
haystack = candidate + 1;
|
||||||
}
|
}
|
||||||
haystack = candidate + 1;
|
if (found || offset == 0) break;
|
||||||
}
|
offset = TMIN(0, offset - readSize + 8);
|
||||||
if (found == buf) {
|
int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET);
|
||||||
SWalCkHead* logContent = (SWalCkHead*)found;
|
ASSERT(offset == offset2);
|
||||||
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
|
if (readSize != taosReadFile(pFile, buf, readSize)) {
|
||||||
// file has to be deleted
|
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
|
if (found == buf) {
|
||||||
|
SWalCkHead* logContent = (SWalCkHead*)found;
|
||||||
|
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
|
||||||
|
// file has to be deleted
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
// TODO truncate file
|
||||||
|
|
||||||
|
if (found == NULL) {
|
||||||
|
// file corrupted, no complete log
|
||||||
|
// TODO delete and search in previous files
|
||||||
|
ASSERT(0);
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
taosCloseFile(&pFile);
|
|
||||||
SWalCkHead* lastEntry = (SWalCkHead*)found;
|
SWalCkHead* lastEntry = (SWalCkHead*)found;
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
|
||||||
return lastEntry->head.version;
|
return lastEntry->head.version;
|
||||||
}
|
}
|
||||||
|
|
|
@ -436,11 +436,6 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
|
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
|
||||||
if (bodyLen > TSDB_MAX_WAL_SIZE) {
|
|
||||||
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
int64_t index = pWal->vers.lastVer + 1;
|
int64_t index = pWal->vers.lastVer + 1;
|
||||||
|
@ -472,10 +467,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
|
||||||
int32_t bodyLen) {
|
int32_t bodyLen) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (bodyLen > TSDB_MAX_WAL_SIZE) {
|
|
||||||
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
// concurrency control:
|
// concurrency control:
|
||||||
|
|
|
@ -64,20 +64,6 @@ int32_t strdequote(char *z) {
|
||||||
return j + 1; // only one quote, do nothing
|
return j + 1; // only one quote, do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
char *strDupUnquo(const char *src) {
|
|
||||||
if (src == NULL) return NULL;
|
|
||||||
if (src[0] != '`') return strdup(src);
|
|
||||||
int32_t len = (int32_t)strlen(src);
|
|
||||||
if (src[len - 1] != '`') return NULL;
|
|
||||||
char *ret = taosMemoryMalloc(len);
|
|
||||||
if (ret == NULL) return NULL;
|
|
||||||
for (int32_t i = 0; i < len - 1; i++) {
|
|
||||||
ret[i] = src[i + 1];
|
|
||||||
}
|
|
||||||
ret[len - 1] = 0;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t strtrim(char *z) {
|
size_t strtrim(char *z) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
|
|
Loading…
Reference in New Issue