From 6de9692b7d759d85fadcd2001cbcae748129ee58 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 3 Dec 2021 14:28:54 +0800 Subject: [PATCH] refactor wal --- include/libs/wal/wal.h | 28 +++++--- include/util/tfile.h | 1 + source/libs/wal/inc/walInt.h | 4 +- source/libs/wal/src/walIndex.c | 125 ++++++++++++++++----------------- source/libs/wal/src/walMgmt.c | 66 +++++++---------- source/libs/wal/src/walWrite.c | 113 ++++++++++++++++++++++++++++- source/util/src/tfile.c | 5 ++ 7 files changed, 225 insertions(+), 117 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index e77540bf90..b514648bbd 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -39,6 +39,14 @@ typedef enum { } EWalType; typedef struct { + //union { + //uint32_t info; + //struct { + //uint32_t sver:3; + //uint32_t msgtype: 5; + //uint32_t reserved : 24; + //}; + //}; int8_t sver; uint8_t msgType; int8_t reserved[2]; @@ -71,13 +79,17 @@ typedef struct { #define WAL_FILESET_MAX 128 #define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) -#define WAL_CUR_POS_READ_ONLY 1 -#define WAL_CUR_FILE_READ_ONLY 2 +#define WAL_CUR_POS_WRITABLE 1 +#define WAL_CUR_FILE_WRITABLE 2 +#define WAL_CUR_FAILED 4 typedef struct SWal { // cfg int32_t vgId; int32_t fsyncPeriod; // millisecond + int32_t fsyncSeq; + int32_t rollPeriod; // second + int64_t segSize; EWalType level; //reference int64_t refId; @@ -86,7 +98,7 @@ typedef struct SWal { int64_t curIdxTfd; //current version int64_t curVersion; - int64_t curOffset; + int64_t curLogOffset; //current file version int64_t curFileFirstVersion; int64_t curFileLastVersion; @@ -94,8 +106,10 @@ typedef struct SWal { int64_t firstVersion; int64_t snapshotVersion; int64_t lastVersion; - //fsync status - int32_t fsyncSeq; + int64_t lastFileName; + //roll status + int64_t lastRollSeq; + int64_t lastFileWriteSize; //ctl int32_t curStatus; pthread_mutex_t mutex; @@ -119,12 +133,10 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); -//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); +void walFsync(SWal *, bool force); // apis for lifecycle management -void walFsync(SWal *, bool force); int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); diff --git a/include/util/tfile.h b/include/util/tfile.h index ff62c9e341..3d0e2177ac 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -28,6 +28,7 @@ void tfCleanup(); // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused +int64_t tfOpenRead(const char *pathname); int64_t tfOpenReadWrite(const char *pathname); int64_t tfOpenCreateWrite(const char *pathname); int64_t tfOpenCreateWriteAppend(const char *pathname); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 42ede49c6b..285d7e2576 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -17,14 +17,16 @@ #define _TD_WAL_INT_H_ #include "wal.h" +#include "compare.h" #ifdef __cplusplus extern "C" { #endif -int walRotate(SWal* pWal); int walGetFile(SWal* pWal, int32_t version); +int64_t walGetSeq(); + #ifdef __cplusplus } #endif diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index 2569af841f..1aa64b34b5 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -20,9 +20,38 @@ #include "tfile.h" #include "walInt.h" -int walSeekVerImpl(SWal *pWal, int64_t ver) { - //close old file +static int walSeekFilePos(SWal* pWal, int64_t ver) { int code = 0; + + int64_t idxTfd = pWal->curIdxTfd; + int64_t logTfd = pWal->curLogTfd; + + //seek position + int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE; + code = tfLseek(idxTfd, offset, SEEK_SET); + if(code != 0) { + + } + int64_t readBuf[2]; + code = tfRead(idxTfd, readBuf, sizeof(readBuf)); + if(code != 0) { + + } + //TODO:deserialize + ASSERT(readBuf[0] == ver); + code = tfLseek(logTfd, readBuf[1], SEEK_CUR); + if (code != 0) { + + } + pWal->curLogOffset = readBuf[1]; + pWal->curVersion = ver; + return code; +} + +static int walChangeFile(SWal *pWal, int64_t ver) { + int code = 0; + int64_t idxTfd, logTfd; + char fnameStr[WAL_FILE_LEN]; code = tfClose(pWal->curLogTfd); if(code != 0) { //TODO @@ -32,29 +61,36 @@ int walSeekVerImpl(SWal *pWal, int64_t ver) { //TODO } //bsearch in fileSet - int fName = 0;//TODO - //open the right file - char fNameStr[WAL_FILE_LEN]; - sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName); - bool closed = 1; //TODO:read only - int64_t idxTfd = tfOpenReadWrite(fNameStr); - sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName); - int64_t logTfd = tfOpenReadWrite(fNameStr); - //seek position - int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE; - tfLseek(idxTfd, offset, SEEK_SET); - //set cur version, cur file version and cur status - pWal->curFileFirstVersion = fName; - pWal->curFileLastVersion = 1;//TODO + int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + ASSERT(pRet != NULL); + int64_t fname = *pRet; + if(fname < pWal->lastFileName) { + pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; + pWal->curFileLastVersion = pRet[1]-1; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + idxTfd = tfOpenRead(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + logTfd = tfOpenRead(fnameStr); + } else { + pWal->curStatus |= WAL_CUR_FILE_WRITABLE; + pWal->curFileLastVersion = -1; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + idxTfd = tfOpenReadWrite(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + logTfd = tfOpenReadWrite(fnameStr); + } + + pWal->curFileFirstVersion = fname; pWal->curLogTfd = logTfd; pWal->curIdxTfd = idxTfd; - pWal->curVersion = ver; - pWal->curOffset = offset; - pWal->curStatus = 0;//TODO return code; } int walSeekVer(SWal *pWal, int64_t ver) { + if((!(pWal->curStatus & WAL_CUR_FAILED)) + && ver == pWal->curVersion) { + return 0; + } if(ver > pWal->lastVersion) { //TODO: some records are skipped return -1; @@ -64,54 +100,13 @@ int walSeekVer(SWal *pWal, int64_t ver) { return -1; } if(ver < pWal->snapshotVersion) { - //TODO: seek snapshotted log + //TODO: seek snapshotted log, invalid in some cases } - if(ver >= pWal->curFileFirstVersion - && ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) { - - } - if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { - int index = 0; - index = 1; - //back up to avoid inconsistency - int64_t curVersion = pWal->curVersion; - int64_t curOffset = pWal->curOffset; - int64_t curFileFirstVersion = pWal->curFileFirstVersion; - int64_t curFileLastVersion = pWal->curFileLastVersion; - if(walSeekVerImpl(pWal, ver) < 0) { - //TODO: errno - pWal->curVersion = curVersion; - pWal->curOffset = curOffset; - pWal->curFileFirstVersion = curFileFirstVersion; - pWal->curFileLastVersion = curFileLastVersion; - return -1; - } + if(ver < pWal->curFileFirstVersion || + (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + walChangeFile(pWal, ver); } + walSeekFilePos(pWal, ver); return 0; } - -int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { - int code = 0; - //get index file - if(!tfValid(pWal->curIdxTfd)) { - code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); - } - if(pWal->curVersion != ver) { - if(walSeekVer(pWal, ver) != 0) { - //TODO: some records are skipped - return -1; - } - } - //check file checksum - //append index - return 0; -} - -int walRotateIndex(SWal *pWal) { - //check file checksum - //create new file - //switch file - return 0; -} diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index d60cdfe118..bc2e687069 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -26,57 +26,44 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); -static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER; -static int8_t walInited = 0; - typedef struct { int32_t refSetId; - int32_t seq; + uint32_t seq; int8_t stop; + int8_t inited; pthread_t thread; - pthread_mutex_t mutex; } SWalMgmt; -static SWalMgmt tsWal = {0}; +static SWalMgmt tsWal = {0, .seq = 1}; static int32_t walCreateThread(); static void walStopThread(); static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); -int32_t walInit() { - //TODO: change to atomic - pthread_mutex_lock(&walInitLock); - if(walInited) { - pthread_mutex_unlock(&walInitLock); - return 0; - } else { - walInited = 1; - pthread_mutex_unlock(&walInitLock); - } +int64_t walGetSeq() { + return (int64_t)atomic_load_32(&tsWal.seq); +} + +int32_t walInit() { + int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); + if(old == 1) return 0; - int32_t code = 0; tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - code = pthread_mutex_init(&tsWal.mutex, NULL); - if (code != 0) { - wError("failed to init wal mutex since %s", tstrerror(code)); - return code; - } - - code = walCreateThread(); + int code = walCreateThread(); if (code != 0) { wError("failed to init wal module since %s", tstrerror(code)); + atomic_store_8(&tsWal.inited, 0); return code; } wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId); - return code; + return 0; } void walCleanUp() { walStopThread(); taosCloseRef(tsWal.refSetId); - pthread_mutex_destroy(&tsWal.mutex); wInfo("wal module is cleaned up"); } @@ -92,7 +79,7 @@ static int walLoadFileset(SWal *pWal) { char *name = ent->d_name; name[WAL_NOSUFFIX_LEN] = 0; //validate file name by regex matching - if(1 /* regex match */) { + if(1 /* TODO:regex match */) { int64_t fnameInt64 = atoll(name); taosArrayPush(pWal->fileSet, &fnameInt64); } @@ -133,6 +120,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walFreeObj(pWal); return NULL; } + walLoadFileset(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); @@ -164,6 +152,9 @@ void walClose(SWal *pWal) { pthread_mutex_lock(&pWal->mutex); tfClose(pWal->curLogTfd); + tfClose(pWal->curIdxTfd); + taosArrayDestroy(pWal->fileSet); + pWal->fileSet = NULL; pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); } @@ -188,6 +179,9 @@ static void walFreeObj(void *wal) { wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); tfClose(pWal->curLogTfd); + tfClose(pWal->curIdxTfd); + taosArrayDestroy(pWal->fileSet); + pWal->fileSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -197,7 +191,7 @@ static bool walNeedFsync(SWal *pWal) { return false; } - if (tsWal.seq % pWal->fsyncSeq == 0) { + if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) { return true; } @@ -206,16 +200,14 @@ static bool walNeedFsync(SWal *pWal) { static void walUpdateSeq() { taosMsleep(WAL_REFRESH_MS); - if (++tsWal.seq <= 0) { - tsWal.seq = 1; - } + atomic_add_fetch_32(&tsWal.seq, 1); } static void walFsyncAll() { SWal *pWal = taosIterateRef(tsWal.refSetId, 0); while (pWal) { if (walNeedFsync(pWal)) { - wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); + wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); int32_t code = tfFsync(pWal->curLogTfd); if (code != 0) { wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code)); @@ -226,16 +218,12 @@ static void walFsyncAll() { } static void *walThreadFunc(void *param) { - int stop = 0; setThreadName("wal"); while (1) { walUpdateSeq(); walFsyncAll(); - pthread_mutex_lock(&tsWal.mutex); - stop = tsWal.stop; - pthread_mutex_unlock(&tsWal.mutex); - if (stop) break; + if (atomic_load_8(&tsWal.stop)) break; } return NULL; @@ -258,9 +246,7 @@ static int32_t walCreateThread() { } static void walStopThread() { - pthread_mutex_lock(&tsWal.mutex); - tsWal.stop = 1; - pthread_mutex_unlock(&tsWal.mutex); + atomic_store_8(&tsWal.stop, 1); if (taosCheckPthreadValid(tsWal.thread)) { pthread_join(tsWal.thread, NULL); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 7563ec02c7..69c83a9912 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -26,10 +26,24 @@ int32_t walCommit(SWal *pWal, int64_t ver) { } int32_t walRollback(SWal *pWal, int64_t ver) { + //TODO: ftruncate return 0; } int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { + pWal->snapshotVersion = ver; + + //mark files safe to delete + int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + if(pRet != pWal->fileSet->pData) { + //delete files until less than retention size + + //find first file that exceeds retention time + + } + + //delete files living longer than retention limit + //remove file from fileset return 0; } @@ -124,13 +138,102 @@ void walRemoveAllOldFiles(void *handle) { } #endif +static int walRoll(SWal *pWal) { + int code = 0; + code = tfClose(pWal->curIdxTfd); + if(code != 0) { + return code; + } + code = tfClose(pWal->curLogTfd); + if(code != 0) { + return code; + } + int64_t idxTfd, logTfd; + //create new file + int64_t newFileFirstVersion = pWal->lastVersion + 1; + char fnameStr[WAL_FILE_LEN]; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion); + idxTfd = tfOpenCreateWrite(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion); + logTfd = tfOpenCreateWrite(fnameStr); + + taosArrayPush(pWal->fileSet, &newFileFirstVersion); + + //switch file + pWal->curIdxTfd = idxTfd; + pWal->curLogTfd = logTfd; + //change status + pWal->curFileLastVersion = -1; + pWal->curFileFirstVersion = newFileFirstVersion; + pWal->curVersion = newFileFirstVersion; + pWal->curLogOffset = 0; + pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; + + pWal->lastFileName = newFileFirstVersion; + pWal->lastFileWriteSize = 0; + pWal->lastRollSeq = walGetSeq(); + return 0; +} + +int walChangeFileToLast(SWal *pWal) { + int64_t idxTfd, logTfd; + int64_t* pRet = taosArrayGetLast(pWal->fileSet); + ASSERT(pRet != NULL); + int64_t fname = *pRet; + + char fnameStr[WAL_FILE_LEN]; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + idxTfd = tfOpenReadWrite(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + logTfd = tfOpenReadWrite(fnameStr); + //switch file + pWal->curIdxTfd = idxTfd; + pWal->curLogTfd = logTfd; + //change status + pWal->curFileLastVersion = -1; + pWal->curFileFirstVersion = fname; + pWal->curVersion = fname; + pWal->curLogOffset = 0; + pWal->curStatus = WAL_CUR_FILE_WRITABLE; + return 0; +} + +int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { + int code = 0; + //get index file + if(!tfValid(pWal->curIdxTfd)) { + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + } + int64_t writeBuf[2] = { ver, offset }; + int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); + if(size != sizeof(writeBuf)) { + //TODO: + } + return 0; +} + int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { if (pWal == NULL) return -1; // no wal - if (!tfValid(pWal->curLogTfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; - if (index > pWal->lastVersion + 1) return -1; + + if (index == pWal->lastVersion + 1) { + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if(passed > pWal->rollPeriod) { + walRoll(pWal); + } else if(pWal->lastFileWriteSize > pWal->segSize) { + walRoll(pWal); + } else { + walChangeFileToLast(pWal); + } + } else { + //reject skip log or rewrite log + //must truncate explicitly first + return -1; + } + if (!tfValid(pWal->curLogTfd)) return 0; pWal->head.version = index; int32_t code = 0; @@ -155,8 +258,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); } - //TODO:write idx + walWriteIndex(pWal, index, pWal->curLogOffset); + pWal->curLogOffset += sizeof(SWalHead) + bodyLen; + //set status + pWal->lastVersion = index; + pthread_mutex_unlock(&pWal->mutex); return code; diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 5b61186b12..5d4789aae6 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -53,6 +53,11 @@ static int64_t tfOpenImp(int32_t fd) { return rid; } +int64_t tfOpenRead(const char *pathname, int32_t flags) { + int32_t fd = taosOpenFileRead(pathname); + return tfOpenImp(fd); +} + int64_t tfOpenReadWrite(const char *pathname, int32_t flags) { int32_t fd = taosOpenFileReadWrite(pathname); return tfOpenImp(fd);