From c048dc5b60b5841557f1f43b00f33e6c65597841 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 14 Dec 2021 18:14:45 +0800 Subject: [PATCH] refactor wal --- include/libs/wal/wal.h | 44 ++-- include/util/tref.h | 2 + source/libs/wal/inc/walInt.h | 14 +- source/libs/wal/src/walMeta.c | 18 +- source/libs/wal/src/walMgmt.c | 97 ++++----- source/libs/wal/src/walRead.c | 8 +- source/libs/wal/src/walSeek.c | 6 +- source/libs/wal/src/walUtil.c | 120 ----------- source/libs/wal/src/walWrite.c | 310 +-------------------------- source/libs/wal/test/walMetaTest.cpp | 18 +- 10 files changed, 98 insertions(+), 539 deletions(-) delete mode 100644 source/libs/wal/src/walUtil.c diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 744275e6ff..e19d65837a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -32,23 +32,19 @@ extern int32_t wDebugFlag; #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} -#define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 +#define WAL_HEAD_VER 0 #define WAL_NOSUFFIX_LEN 20 #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) #define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" #define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) -#define WAL_CUR_POS_WRITABLE 1 -#define WAL_CUR_FILE_WRITABLE 2 -#define WAL_CUR_FAILED 4 +#define WAL_CUR_FAILED 1 -#pragma pack(push,1) +#pragma pack(push, 1) typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, @@ -56,11 +52,11 @@ typedef enum { } EWalType; typedef struct SWalReadHead { - int8_t sver; + int8_t headVer; uint8_t msgType; int8_t reserved[2]; int32_t len; - //int64_t ingestTs; //not implemented + int64_t ingestTs; //not implemented int64_t version; char body[]; } SWalReadHead; @@ -72,18 +68,10 @@ typedef struct { int32_t rollPeriod; // secs int64_t retentionSize; int64_t segSize; - EWalType level; // wal level + EWalType level; // wal level } SWalCfg; typedef struct { - //union { - //uint32_t info; - //struct { - //uint32_t sver:3; - //uint32_t msgtype: 5; - //uint32_t reserved : 24; - //}; - //}; uint32_t cksumHead; uint32_t cksumBody; SWalReadHead head; @@ -102,16 +90,16 @@ typedef struct SWal { SWalCfg cfg; SWalVer vers; //file set - int32_t writeCur; int64_t writeLogTfd; int64_t writeIdxTfd; + int32_t writeCur; SArray* fileInfoSet; - //ctl - int32_t curStatus; - int32_t fsyncSeq; + //statistics int64_t totSize; - int64_t refId; int64_t lastRollSeq; + //ctl + int32_t fsyncSeq; + int64_t refId; pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; @@ -131,7 +119,7 @@ typedef struct SWalReadHandle { } SWalReadHandle; #pragma pack(pop) -typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); +//typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); // module initialization int32_t walInit(); @@ -151,8 +139,8 @@ int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); // notify that previous logs can be pruned safely -int32_t walBeginTakeSnapshot(SWal *, int64_t ver); -int32_t walEndTakeSnapshot(SWal *); +int32_t walBeginSnapshot(SWal *, int64_t ver); +int32_t walEndSnapshot(SWal *); //int32_t walDataCorrupted(SWal*); // read @@ -161,7 +149,7 @@ void walCloseReadHandle(SWalReadHandle *); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); int32_t walRead(SWal *, SWalHead **, int64_t ver); -int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); +//int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); // lifecycle check int64_t walGetFirstVer(SWal *); diff --git a/include/util/tref.h b/include/util/tref.h index cc7d075f52..6680204d63 100644 --- a/include/util/tref.h +++ b/include/util/tref.h @@ -17,6 +17,8 @@ #ifndef _TD_UTIL_REF_H #define _TD_UTIL_REF_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index e546a87326..48142878c3 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -33,12 +33,10 @@ typedef struct WalFileInfo { int64_t fileSize; } WalFileInfo; -#pragma pack(push,1) typedef struct WalIdxEntry { int64_t ver; int64_t offset; } WalIdxEntry; -#pragma pack(pop) static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; @@ -107,8 +105,16 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { return taosCalcChecksum(0, (uint8_t*)body, len); } -int walReadMeta(SWal* pWal); -int walWriteMeta(SWal* pWal); +static inline void walResetVer(SWalVer* pVer) { + pVer->firstVer = -1; + pVer->verInSnapshotting = -1; + pVer->snapshotVer = -1; + pVer->commitVer = -1; + pVer->lastVer = -1; +} + +int walLoadMeta(SWal* pWal); +int walSaveMeta(SWal* pWal); int walRollFileInfo(SWal* pWal); char* walMetaSerialize(SWal* pWal); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 49f4fde3a0..aa592b4fe8 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -24,18 +24,22 @@ #include #include -int64_t walGetFirstVer(SWal *pWal) { +int64_t inline walGetFirstVer(SWal *pWal) { return pWal->vers.firstVer; } -int64_t walGetSnaphostVer(SWal *pWal) { +int64_t inline walGetSnaphostVer(SWal *pWal) { return pWal->vers.snapshotVer; } -int64_t walGetLastVer(SWal *pWal) { +int64_t inline walGetLastVer(SWal *pWal) { return pWal->vers.lastVer; } +static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +} + int walRollFileInfo(SWal* pWal) { int64_t ts = taosGetTimestampSec(); @@ -150,10 +154,6 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { return 0; } -static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { - return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); -} - static int walFindCurMetaVer(SWal* pWal) { const char * pattern = "^meta-ver[0-9]+$"; regex_t walMetaRegexPattern; @@ -182,7 +182,7 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } -int walWriteMeta(SWal* pWal) { +int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer+1, fnameStr); @@ -207,7 +207,7 @@ int walWriteMeta(SWal* pWal) { return 0; } -int walReadMeta(SWal* pWal) { +int walLoadMeta(SWal* pWal) { ASSERT(pWal->fileInfoSet->size == 0); //find existing meta file int metaVer = walFindCurMetaVer(pWal); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 629451a722..9efeb83cf0 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -21,23 +21,17 @@ #include "compare.h" #include "walInt.h" -//internal -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); - typedef struct { - int32_t refSetId; - uint32_t seq; int8_t stop; int8_t inited; + uint32_t seq; + int32_t refSetId; pthread_t thread; } SWalMgmt; static SWalMgmt tsWal = {0, .seq = 1}; static int32_t walCreateThread(); static void walStopThread(); -static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); int64_t walGetSeq() { @@ -68,7 +62,7 @@ int32_t walInit() { } void walCleanUp() { - int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); + int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); if(old == 0) { return; } @@ -83,48 +77,59 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - memset(pWal, 0, sizeof(SWal)); - pWal->writeLogTfd = -1; - pWal->writeIdxTfd = -1; - pWal->writeCur = -1; //set config memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); + pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; + if(pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; - //init version info - pWal->vers.firstVer = -1; - pWal->vers.commitVer = -1; - pWal->vers.snapshotVer = -1; - pWal->vers.lastVer = -1; + tstrncpy(pWal->path, path, sizeof(pWal->path)); + if(taosMkDir(pWal->path) != 0) { + wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); + return NULL; + } - pWal->vers.verInSnapshotting = -1; - - pWal->totSize = 0; + //open meta + pWal->writeLogTfd = -1; + pWal->writeIdxTfd = -1; + pWal->writeCur = -1; + pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); + if(pWal->fileInfoSet == NULL) { + wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); + free(pWal); + return NULL; + } //init status + walResetVer(&pWal->vers); + pWal->totSize = 0; pWal->lastRollSeq = -1; //init write buffer memset(&pWal->writeHead, 0, sizeof(SWalHead)); - pWal->writeHead.head.sver = 0; + pWal->writeHead.head.headVer = WAL_HEAD_VER; - tstrncpy(pWal->path, path, sizeof(pWal->path)); - pthread_mutex_init(&pWal->mutex, NULL); - - pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; - if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; - - if (walInitObj(pWal) != 0) { - walFreeObj(pWal); + if(pthread_mutex_init(&pWal->mutex, NULL) < 0) { + taosArrayDestroy(pWal->fileInfoSet); + free(pWal); return NULL; } - pWal->refId = taosAddRef(tsWal.refSetId, pWal); - if (pWal->refId < 0) { - walFreeObj(pWal); + pWal->refId = taosAddRef(tsWal.refSetId, pWal); + if(pWal->refId < 0) { + pthread_mutex_destroy(&pWal->mutex); + taosArrayDestroy(pWal->fileInfoSet); + free(pWal); + return NULL; + } + + if(walLoadMeta(pWal) < 0) { + taosRemoveRef(tsWal.refSetId, pWal->refId); + pthread_mutex_destroy(&pWal->mutex); + taosArrayDestroy(pWal->fileInfoSet); + free(pWal); return NULL; } - walReadMeta(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); @@ -152,43 +157,23 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { } void walClose(SWal *pWal) { - if (pWal == NULL) return; - pthread_mutex_lock(&pWal->mutex); tfClose(pWal->writeLogTfd); pWal->writeLogTfd = -1; tfClose(pWal->writeIdxTfd); pWal->writeIdxTfd = -1; - walWriteMeta(pWal); + walSaveMeta(pWal); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; pthread_mutex_unlock(&pWal->mutex); + taosRemoveRef(tsWal.refSetId, pWal->refId); } -static int32_t walInitObj(SWal *pWal) { - if (taosMkDir(pWal->path) != 0) { - wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); - if(pWal->fileInfoSet == NULL) { - wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - - wDebug("vgId:%d, object is initialized", pWal->cfg.vgId); - return 0; -} - static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); - tfClose(pWal->writeLogTfd); - tfClose(pWal->writeIdxTfd); - taosArrayDestroy(pWal->fileInfoSet); - pWal->fileInfoSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b6aafedea3..42fcb8c375 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -54,7 +54,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i int64_t logTfd = pRead->readLogTfd; //seek position - int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry); code = tfLseek(idxTfd, offset, SEEK_SET); if(code < 0) { return -1; @@ -210,6 +210,6 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { return 0; } -int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) { - return 0; -} +/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/ + /*return 0;*/ +/*}*/ diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 953aae703c..7db5b90c1d 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int64_t logTfd = pWal->writeLogTfd; //seek position - int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { return -1; @@ -66,8 +66,6 @@ int walChangeFileToLast(SWal *pWal) { //switch file pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; - //change status - pWal->curStatus = WAL_CUR_FILE_WRITABLE; return 0; } @@ -93,13 +91,11 @@ int walChangeFile(SWal *pWal, int64_t ver) { int64_t fileFirstVer = pRet->firstVer; //closed if(taosArrayGetLast(pWal->fileInfoSet) != pRet) { - pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenRead(fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenRead(fnameStr); } else { - pWal->curStatus |= WAL_CUR_FILE_WRITABLE; walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr); diff --git a/source/libs/wal/src/walUtil.c b/source/libs/wal/src/walUtil.c deleted file mode 100644 index 849d0c3e51..0000000000 --- a/source/libs/wal/src/walUtil.c +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "walInt.h" - -#if 0 -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { - int64_t curFileId = *nextFileId; - int64_t minFileId = INT64_MAX; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - - if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t id = atoll(name + WAL_PREFIX_LEN); - if (id <= curFileId) continue; - - if (id < minFileId) { - minFileId = id; - } - } - } - closedir(dir); - - if (minFileId == INT64_MAX) return -1; - - *nextFileId = minFileId; - wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId); - - return 0; -} - -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) { - int64_t minFileId = INT64_MAX; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - - if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t id = atoll(name + WAL_PREFIX_LEN); - if (id >= curFileId) continue; - - minDiff--; - if (id < minFileId) { - minFileId = id; - } - } - } - closedir(dir); - - if (minFileId == INT64_MAX) return -1; - if (minDiff > 0) return -1; - - *oldFileId = minFileId; - wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); - - return 0; -} - -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { - int64_t maxFileId = INT64_MIN; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - - if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t id = atoll(name + WAL_PREFIX_LEN); - if (id > maxFileId) { - maxFileId = id; - } - } - } - closedir(dir); - - if (maxFileId == INT64_MIN) { - *newFileId = 0; - } else { - *newFileId = maxFileId; - } - - wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId); - - return 0; -} -#endif diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 994b8fc333..ffbb19c6b7 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,98 +21,6 @@ #include "tfile.h" #include "walInt.h" - -#if 0 -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); - -int32_t walRenew(void *handle) { - if (handle == NULL) return 0; - - SWal * pWal = handle; - int32_t code = 0; - - /*if (pWal->stop) {*/ - /*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/ - /*return 0;*/ - /*}*/ - - pthread_mutex_lock(&pWal->mutex); - - if (tfValid(pWal->logTfd)) { - tfClose(pWal->logTfd); - wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName); - } - - /*if (pWal->keep == TAOS_WAL_KEEP) {*/ - /*pWal->fileId = 0;*/ - /*} else {*/ - /*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/ - /*pWal->fileId++;*/ - /*}*/ - - snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId); - pWal->logTfd = tfOpenCreateWrite(pWal->logName); - - if (!tfValid(pWal->logTfd)) { - code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno)); - } else { - wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName); - } - - pthread_mutex_unlock(&pWal->mutex); - - return code; -} - -void walRemoveOneOldFile(void *handle) { - SWal *pWal = handle; - if (pWal == NULL) return; - /*if (pWal->keep == TAOS_WAL_KEEP) return;*/ - if (!tfValid(pWal->logTfd)) return; - - pthread_mutex_lock(&pWal->mutex); - - // remove the oldest wal file - int64_t oldFileId = -1; - if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) { - char walName[WAL_FILE_LEN] = {0}; - snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); - - if (remove(walName) < 0) { - wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); - } else { - wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName); - } - } - - pthread_mutex_unlock(&pWal->mutex); -} - -void walRemoveAllOldFiles(void *handle) { - if (handle == NULL) return; - - SWal * pWal = handle; - int64_t fileId = -1; - - pthread_mutex_lock(&pWal->mutex); - - tfClose(pWal->logTfd); - wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName); - - while (walGetNextFile(pWal, &fileId) >= 0) { - snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - if (remove(pWal->logName) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno)); - } else { - wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName); - } - } - pthread_mutex_unlock(&pWal->mutex); -} -#endif - int32_t walCommit(SWal *pWal, int64_t ver) { ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); @@ -166,7 +74,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { pthread_mutex_unlock(&pWal->mutex); return -1; } - int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; + int idxOff = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); code = tfLseek(idxTfd, idxOff, SEEK_SET); if(code < 0) { pthread_mutex_unlock(&pWal->mutex); @@ -229,7 +137,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } -int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { +int32_t walBeginSnapshot(SWal* pWal, int64_t ver) { pWal->vers.verInSnapshotting = ver; //check file rolling if(pWal->cfg.retentionPeriod == 0) { @@ -239,7 +147,7 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { return 0; } -int32_t walEndTakeSnapshot(SWal *pWal) { +int32_t walEndSnapshot(SWal *pWal) { int64_t ver = pWal->vers.verInSnapshotting; if(ver == -1) return -1; @@ -287,7 +195,7 @@ int32_t walEndTakeSnapshot(SWal *pWal) { pWal->vers.verInSnapshotting = -1; //save snapshot ver, commit ver - int code = walWriteMeta(pWal); + int code = walSaveMeta(pWal); if(code != 0) { return -1; } @@ -314,13 +222,13 @@ int walRoll(SWal *pWal) { int64_t newFileFirstVersion = pWal->vers.lastVer + 1; char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, newFileFirstVersion, fnameStr); - idxTfd = tfOpenCreateWrite(fnameStr); + idxTfd = tfOpenCreateWriteAppend(fnameStr); if(idxTfd < 0) { ASSERT(0); return -1; } walBuildLogName(pWal, newFileFirstVersion, fnameStr); - logTfd = tfOpenCreateWrite(fnameStr); + logTfd = tfOpenCreateWriteAppend(fnameStr); if(logTfd < 0) { ASSERT(0); return -1; @@ -335,8 +243,6 @@ int walRoll(SWal *pWal) { pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; - //change status - pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; pWal->lastRollSeq = walGetSeq(); return 0; @@ -425,74 +331,6 @@ void walFsync(SWal *pWal, bool forceFsync) { } } -#if 0 -int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { - if (handle == NULL) return -1; - - SWal * pWal = handle; - int32_t count = 0; - int32_t code = 0; - int64_t fileId = -1; - - while ((code = walGetNextFile(pWal, &fileId)) >= 0) { - /*if (fileId == pWal->curFileId) continue;*/ - - char walName[WAL_FILE_LEN]; - snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName); - code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId); - if (code != TSDB_CODE_SUCCESS) { - wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code)); - continue; - } - - wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion); - - count++; - } - - /*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/ - - if (count == 0) { - wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId); - return walRenew(pWal); - } else { - // open the existing WAL file in append mode - /*pWal->curFileId = 0;*/ - snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId); - pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName); - if (!tfValid(pWal->logTfd)) { - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { - if (handle == NULL) return -1; - SWal *pWal = handle; - - if (*fileId == 0) *fileId = -1; - - pthread_mutex_lock(&(pWal->mutex)); - - int32_t code = walGetNextFile(pWal, fileId); - if (code >= 0) { - sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId); - /*code = (*fileId == pWal->curFileId) ? 0 : 1;*/ - } - - wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId); - pthread_mutex_unlock(&(pWal->mutex)); - - return code; -} -#endif - /*static int walValidateOffset(SWal* pWal, int64_t ver) {*/ /*int code = 0;*/ /*SWalHead *pHead = NULL;*/ @@ -516,139 +354,3 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { /*return 0;*/ /*}*/ - -#if 0 -static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { - int64_t pos = *offset; - while (1) { - pos++; - - if (tfLseek(tfd, pos, SEEK_SET) < 0) { - wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); - return TSDB_CODE_WAL_FILE_CORRUPTED; - } - - if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) { - wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); - return TSDB_CODE_WAL_FILE_CORRUPTED; - } - - if (pHead->signature != WAL_SIGNATURE) { - continue; - } - - if (pHead->sver >= 1) { - if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { - wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); - return TSDB_CODE_WAL_FILE_CORRUPTED; - } - - if (walValidateChecksum(pHead)) { - wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); - *offset = pos; - return TSDB_CODE_SUCCESS; - } - } - } - - return TSDB_CODE_WAL_FILE_CORRUPTED; -} - -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) { - int32_t size = WAL_MAX_SIZE; - void * buffer = malloc(size); - if (buffer == NULL) { - wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - - int64_t tfd = tfOpenReadWrite(name); - if (!tfValid(tfd)) { - wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); - tfree(buffer); - return TAOS_SYSTEM_ERROR(errno); - } else { - wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name); - } - - int32_t code = TSDB_CODE_SUCCESS; - int64_t offset = 0; - SWalHead *pHead = buffer; - - while (1) { - int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead)); - if (ret == 0) break; - - if (ret < 0) { - wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (ret < sizeof(SWalHead)) { - wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); - walFtruncate(pWal, tfd, offset); - break; - } - - if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) { - wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { - wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); - if (ret < 0) { - wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (ret < pHead->len) { - wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); - offset += sizeof(SWalHead); - continue; - } - - if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) { - wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - offset = offset + sizeof(SWalHead) + pHead->len; - - wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, - pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset); - - pWal->curVersion = pHead->version; - - // wInfo("writeFp: %ld", offset); - (*writeFp)(pVnode, pHead); - } - - tfClose(tfd); - tfree(buffer); - - wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name); - return code; -} -#endif diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 200bf39c5a..d06388201e 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -142,7 +142,7 @@ TEST_F(WalCleanEnv, serialize) { char*ss = walMetaSerialize(pWal); printf("%s\n", ss); free(ss); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT(code == 0); } @@ -150,11 +150,11 @@ TEST_F(WalCleanEnv, removeOldMeta) { int code = walRollFileInfo(pWal); ASSERT(code == 0); ASSERT(pWal->fileInfoSet != NULL); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT(code == 0); code = walRollFileInfo(pWal); ASSERT(code == 0); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT(code == 0); } @@ -199,7 +199,7 @@ TEST_F(WalCleanEnv, write) { ASSERT_EQ(code, -1); ASSERT_EQ(pWal->vers.lastVer, i); } - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT_EQ(code, 0); } @@ -216,7 +216,7 @@ TEST_F(WalCleanEnv, rollback) { code = walRollback(pWal, 3); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, 2); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT_EQ(code, 0); } @@ -231,9 +231,9 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.commitVer, i); } - walBeginTakeSnapshot(pWal, i-1); + walBeginSnapshot(pWal, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, i-1); - walEndTakeSnapshot(pWal); + walEndSnapshot(pWal); ASSERT_EQ(pWal->vers.snapshotVer, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1); @@ -247,9 +247,9 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.commitVer, i); } - code = walBeginTakeSnapshot(pWal, i - 1); + code = walBeginSnapshot(pWal, i - 1); ASSERT_EQ(code, 0); - code = walEndTakeSnapshot(pWal); + code = walEndSnapshot(pWal); ASSERT_EQ(code, 0); }