From 9e4b8907273584992635fa1a4a56f4aa99570e33 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 11 Dec 2021 17:17:21 +0800 Subject: [PATCH] add test for wal snapshot --- include/libs/wal/wal.h | 47 +-- include/util/tarray.h | 15 + include/util/tchecksum.h | 2 +- source/libs/wal/inc/walInt.h | 30 ++ source/libs/wal/src/walMeta.c | 12 + source/libs/wal/src/walMgmt.c | 12 +- source/libs/wal/src/walRead.c | 32 +- source/libs/wal/src/{walIndex.c => walSeek.c} | 45 ++- source/libs/wal/src/walWrite.c | 307 +++++++++++------- source/libs/wal/test/walMetaTest.cpp | 128 ++++++-- source/util/src/tarray.c | 11 +- 11 files changed, 440 insertions(+), 201 deletions(-) rename source/libs/wal/src/{walIndex.c => walSeek.c} (76%) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index f84297670b..fe3ee17d00 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -38,6 +38,24 @@ typedef enum { TAOS_WAL_FSYNC = 2 } EWalType; +typedef struct SWalReadHead { + int8_t sver; + uint8_t msgType; + int8_t reserved[2]; + int32_t len; + int64_t version; + char cont[]; +} SWalReadHead; + +typedef struct { + int32_t vgId; + int32_t fsyncPeriod; // millisecond + int32_t retentionPeriod; // secs + int32_t rollPeriod; // secs + int64_t segSize; + EWalType walLevel; // wal level +} SWalCfg; + typedef struct { //union { //uint32_t info; @@ -47,25 +65,11 @@ typedef struct { //uint32_t reserved : 24; //}; //}; - int8_t sver; - uint8_t msgType; - int8_t reserved[2]; - int32_t len; - int64_t version; - uint32_t signature; uint32_t cksumHead; uint32_t cksumBody; - char cont[]; + SWalReadHead head; } SWalHead; -typedef struct { - int32_t vgId; - int32_t fsyncPeriod; // millisecond - int32_t rollPeriod; - int64_t segSize; - EWalType walLevel; // wal level -} SWalCfg; - #define WAL_PREFIX "wal" #define WAL_PREFIX_LEN 3 #define WAL_NOSUFFIX_LEN 20 @@ -80,7 +84,7 @@ typedef struct { //#define WAL_FILE_NUM 1 // 3 #define WAL_FILESET_MAX 128 -#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) +#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) #define WAL_CUR_POS_WRITABLE 1 #define WAL_CUR_FILE_WRITABLE 2 #define WAL_CUR_FAILED 4 @@ -103,21 +107,17 @@ typedef struct SWal { //write tfd int64_t writeLogTfd; int64_t writeIdxTfd; - //read tfd - int64_t readLogTfd; - int64_t readIdxTfd; - //current version - int64_t curVersion; //wal lifecycle int64_t firstVersion; int64_t snapshotVersion; int64_t commitVersion; int64_t lastVersion; + //snapshotting version + int64_t snapshottingVer; //roll status int64_t lastRollSeq; //file set int32_t writeCur; - int32_t readCur; SArray* fileInfoSet; //ctl int32_t curStatus; @@ -148,7 +148,8 @@ int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); // notify that previous logs can be pruned safely -int32_t walTakeSnapshot(SWal *, int64_t ver); +int32_t walBeginTakeSnapshot(SWal *, int64_t ver); +int32_t walEndTakeSnapshot(SWal *); //int32_t walDataCorrupted(SWal*); // read diff --git a/include/util/tarray.h b/include/util/tarray.h index 25862a7119..ad02b20868 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -153,6 +153,13 @@ void taosArraySet(SArray* pArray, size_t index, void* pData); */ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt); +/** + * remove some data entry from front + * @param pArray + * @param cnt + */ +void taosArrayPopTailBatch(SArray* pArray, size_t cnt); + /** * remove data entry of the given index * @param pArray @@ -213,6 +220,14 @@ void taosArraySortString(SArray* pArray, __compar_fn_t comparFn); */ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags); +/** + * search the array, return index of the element + * @param pArray + * @param compar + * @param key + */ +int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags); + /** * search the array * @param pArray diff --git a/include/util/tchecksum.h b/include/util/tchecksum.h index c1907b0c46..fb6c85edcd 100644 --- a/include/util/tchecksum.h +++ b/include/util/tchecksum.h @@ -39,7 +39,7 @@ static FORCE_INLINE int taosCalcChecksumAppend(TSCKSUM csi, uint8_t *stream, uin } static FORCE_INLINE int taosCheckChecksum(const uint8_t *stream, uint32_t ssize, TSCKSUM checksum) { - return (checksum == (*crc32c)(0, stream, (size_t)ssize)); + return (checksum != (*crc32c)(0, stream, (size_t)ssize)); } static FORCE_INLINE int taosCheckChecksumWhole(const uint8_t *stream, uint32_t ssize) { diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index f56f240904..f8f2e2eadd 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -18,6 +18,7 @@ #include "wal.h" #include "compare.h" +#include "tchecksum.h" #ifdef __cplusplus extern "C" { @@ -32,6 +33,11 @@ typedef struct WalFileInfo { int64_t fileSize; } WalFileInfo; +typedef struct WalIdxEntry { + int64_t ver; + int64_t offset; +} WalIdxEntry; + static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; WalFileInfo* pInfoRight = (WalFileInfo*)pRight; @@ -79,6 +85,26 @@ static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); } +static inline int walValidHeadCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)&pHead->head, sizeof(SWalReadHead), pHead->cksumHead); +} + +static inline int walValidBodyCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody); +} + +static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { + return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); +} + +static inline uint32_t walCalcHeadCksum(SWalHead *pHead) { + return taosCalcChecksum(0, (uint8_t*)&pHead->head, sizeof(SWalReadHead)); +} + +static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { + return taosCalcChecksum(0, (uint8_t*)body, len); +} + int walReadMeta(SWal* pWal); int walWriteMeta(SWal* pWal); int walRollFileInfo(SWal* pWal); @@ -87,6 +113,10 @@ char* walMetaSerialize(SWal* pWal); int walMetaDeserialize(SWal* pWal, const char* bytes); //meta section end +//seek section +int walChangeFile(SWal *pWal, int64_t ver); +//seek section end + int64_t walGetSeq(); int walSeekVer(SWal *pWal, int64_t ver); int walRoll(SWal *pWal); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 65085bb96d..89588b1ddf 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -24,6 +24,18 @@ #include #include +int64_t walGetFirstVer(SWal *pWal) { + return pWal->firstVersion; +} + +int64_t walGetSnaphostVer(SWal *pWal) { + return pWal->snapshotVersion; +} + +int64_t walGetLastVer(SWal *pWal) { + return pWal->lastVersion; +} + int walRollFileInfo(SWal* pWal) { int64_t ts = taosGetTimestampSec(); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 6bf9008917..9090348c04 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -82,6 +82,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } pWal->writeLogTfd = -1; pWal->writeIdxTfd = -1; + pWal->writeCur = -1; //set config pWal->vgId = pCfg->vgId; @@ -90,13 +91,20 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->segSize = pCfg->segSize; pWal->level = pCfg->walLevel; - //init status + //init version info + pWal->firstVersion = -1; + pWal->commitVersion = -1; + pWal->snapshotVersion = -1; pWal->lastVersion = -1; + + pWal->snapshottingVer = -1; + + //init status pWal->lastRollSeq = -1; //init write buffer memset(&pWal->head, 0, sizeof(SWalHead)); - pWal->head.sver = 0; + pWal->head.head.sver = 0; tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index e9f5bcbc5d..27dffddf83 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -15,19 +15,6 @@ #include "walInt.h" #include "tfile.h" -#include "tchecksum.h" - -static inline int walValidHeadCksum(SWalHead* pHead) { - return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead); -} - -static inline int walValidBodyCksum(SWalHead* pHead) { - return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody); -} - -static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { - return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); -} int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int code; @@ -49,13 +36,13 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { if(walValidHeadCksum(*ppHead) != 0) { return -1; } - void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len); + void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len); if(ptr == NULL) { free(*ppHead); *ppHead = NULL; return -1; } - if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { + if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) { return -1; } //TODO: endian compatibility processing after read @@ -69,18 +56,3 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) { return 0; } - -int64_t walGetFirstVer(SWal *pWal) { - if (pWal == NULL) return 0; - return pWal->firstVersion; -} - -int64_t walGetSnaphostVer(SWal *pWal) { - if (pWal == NULL) return 0; - return pWal->snapshotVersion; -} - -int64_t walGetLastVer(SWal *pWal) { - if (pWal == NULL) return 0; - return pWal->lastVersion; -} diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walSeek.c similarity index 76% rename from source/libs/wal/src/walIndex.c rename to source/libs/wal/src/walSeek.c index bf51be4346..48272f8f32 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walSeek.c @@ -43,12 +43,35 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { if (code != 0) { return -1; } - /*pWal->curLogOffset = readBuf[1];*/ - pWal->curVersion = ver; return code; } -static int walChangeFile(SWal *pWal, int64_t ver) { +int walChangeFileToLast(SWal *pWal) { + int64_t idxTfd, logTfd; + WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); + ASSERT(pRet != NULL); + int64_t fileFirstVer = pRet->firstVer; + + char fnameStr[WAL_FILE_LEN]; + walBuildIdxName(pWal, fileFirstVer, fnameStr); + idxTfd = tfOpenReadWrite(fnameStr); + if(idxTfd < 0) { + return -1; + } + walBuildLogName(pWal, fileFirstVer, fnameStr); + logTfd = tfOpenReadWrite(fnameStr); + if(logTfd < 0) { + return -1; + } + //switch file + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; + //change status + pWal->curStatus = WAL_CUR_FILE_WRITABLE; + return 0; +} + +int walChangeFile(SWal *pWal, int64_t ver) { int code = 0; int64_t idxTfd, logTfd; char fnameStr[WAL_FILE_LEN]; @@ -86,21 +109,21 @@ static int walChangeFile(SWal *pWal, int64_t ver) { return code; } +int walGetVerOffset(SWal* pWal, int64_t ver) { + int code; + return 0; +} + int walSeekVer(SWal *pWal, int64_t ver) { int code; - if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) { + if(ver == pWal->lastVersion) { return 0; } - if(ver > pWal->lastVersion) { - //TODO: some records are skipped - return -1; - } - if(ver < pWal->firstVersion) { - //TODO: try to seek pruned log + if(ver > pWal->lastVersion || ver < pWal->firstVersion) { return -1; } if(ver < pWal->snapshotVersion) { - //TODO: seek snapshotted log, invalid in some cases + //TODO: set flag to prevent roll back } if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { code = walChangeFile(pWal, ver); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 0c4989300f..87d52ad236 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,65 +21,6 @@ #include "tfile.h" #include "walInt.h" -static void walFtruncate(SWal *pWal, int64_t ver); - -int32_t walCommit(SWal *pWal, int64_t ver) { - ASSERT(pWal->snapshotVersion <= pWal->commitVersion); - ASSERT(pWal->commitVersion <= pWal->lastVersion); - ASSERT(ver >= pWal->commitVersion); - ASSERT(ver <= pWal->lastVersion); - pWal->commitVersion = ver; - return 0; -} - -int32_t walRollback(SWal *pWal, int64_t ver) { - //TODO: ftruncate - ASSERT(ver > pWal->commitVersion); - ASSERT(ver <= pWal->lastVersion); - //seek position - walSeekVer(pWal, ver); - walFtruncate(pWal, ver); - return 0; -} - -int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { - pWal->snapshotVersion = ver; - int ts = taosGetTimestampSec(); - - int deleteCnt = 0; - int64_t newTotSize = pWal->totSize; - WalFileInfo tmp; - tmp.firstVer = ver; - //mark files safe to delete - WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); - //iterate files, until the searched result - for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if(pWal->totSize > pWal->retentionSize || - iter->closeTs + pWal->retentionPeriod > ts) { - //delete according to file size or close time - deleteCnt++; - newTotSize -= iter->fileSize; - } - } - char fnameStr[WAL_FILE_LEN]; - //remove file - for(int i = 0; i < deleteCnt; i++) { - WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i); - walBuildLogName(pWal, pInfo->firstVer, fnameStr); - remove(fnameStr); - walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - remove(fnameStr); - } - - //save snapshot ver, commit ver - - - //make new array, remove files - taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); - pWal->totSize = newTotSize; - - return 0; -} #if 0 static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); @@ -172,6 +113,185 @@ void walRemoveAllOldFiles(void *handle) { } #endif +int32_t walCommit(SWal *pWal, int64_t ver) { + ASSERT(pWal->commitVersion >= pWal->snapshotVersion); + ASSERT(pWal->commitVersion <= pWal->lastVersion); + if(ver < pWal->commitVersion || ver > pWal->lastVersion) { + return -1; + } + pWal->commitVersion = ver; + return 0; +} + +int32_t walRollback(SWal *pWal, int64_t ver) { + int code; + char fnameStr[WAL_FILE_LEN]; + if(ver == pWal->lastVersion) { + return 0; + } + if(ver > pWal->lastVersion || ver < pWal->commitVersion) { + return -1; + } + pthread_mutex_lock(&pWal->mutex); + + //find correct file + if(ver < walGetLastFileFirstVer(pWal)) { + //close current files + tfClose(pWal->writeIdxTfd); + tfClose(pWal->writeLogTfd); + //open old files + code = walChangeFile(pWal, ver); + if(code != 0) { + return -1; + } + + //delete files + int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); + for(int i = pWal->writeCur; i < fileSetSize; i++) { + walBuildLogName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); + remove(fnameStr); + walBuildIdxName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); + remove(fnameStr); + } + //pop from fileInfoSet + taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1); + } + + walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); + int64_t idxTfd = tfOpenReadWrite(fnameStr); + + //change to deserialize function + + if(idxTfd < 0) { + pthread_mutex_unlock(&pWal->mutex); + return -1; + } + int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; + code = tfLseek(idxTfd, idxOff, SEEK_SET); + if(code < 0) { + pthread_mutex_unlock(&pWal->mutex); + return -1; + } + //read idx file and get log file pos + //TODO:change to deserialize function + WalIdxEntry entry; + if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) { + pthread_mutex_unlock(&pWal->mutex); + return -1; + } + ASSERT(entry.ver == ver); + + walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); + int64_t logTfd = tfOpenReadWrite(fnameStr); + if(logTfd < 0) { + //TODO + pthread_mutex_unlock(&pWal->mutex); + return -1; + } + code = tfLseek(logTfd, entry.offset, SEEK_SET); + if(code < 0) { + //TODO + pthread_mutex_unlock(&pWal->mutex); + return -1; + } + //validate offset + SWalHead head; + ASSERT(tfValid(logTfd)); + int size = tfRead(logTfd, &head, sizeof(SWalHead)); + if(size != sizeof(SWalHead)) { + return -1; + } + code = walValidHeadCksum(&head); + + ASSERT(code == 0); + if(code != 0) { + return -1; + } + if(head.head.version != ver) { + //TODO + return -1; + } + //truncate old files + code = tfFtruncate(logTfd, entry.offset); + if(code < 0) { + return -1; + } + code = tfFtruncate(idxTfd, idxOff); + if(code < 0) { + return -1; + } + pWal->lastVersion = ver - 1; + ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; + ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; + + //unlock + pthread_mutex_unlock(&pWal->mutex); + return 0; +} +int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { + pWal->snapshottingVer = ver; + //check file rolling + if(pWal->retentionPeriod == 0) { + walRoll(pWal); + } + + return 0; +} + +int32_t walEndTakeSnapshot(SWal *pWal) { + int64_t ver = pWal->snapshottingVer; + if(ver == -1) return -1; + + pWal->snapshotVersion = ver; + int ts = taosGetTimestampSec(); + + int deleteCnt = 0; + int64_t newTotSize = pWal->totSize; + WalFileInfo tmp; + tmp.firstVer = ver; + //find files safe to delete + WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + if(ver >= pInfo->lastVer) { + pInfo++; + } + //iterate files, until the searched result + for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { + if(pWal->totSize > pWal->retentionSize || + iter->closeTs + pWal->retentionPeriod > ts) { + //delete according to file size or close time + deleteCnt++; + newTotSize -= iter->fileSize; + } + } + char fnameStr[WAL_FILE_LEN]; + //remove file + for(int i = 0; i < deleteCnt; i++) { + WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + } + + //make new array, remove files + taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); + if(taosArrayGetSize(pWal->fileInfoSet) == 0) { + pWal->firstVersion = -1; + } else { + pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + } + pWal->totSize = newTotSize; + pWal->snapshottingVer = -1; + + //save snapshot ver, commit ver + int code = walWriteMeta(pWal); + if(code != 0) { + return -1; + } + + return 0; +} + int walRoll(SWal *pWal) { int code = 0; if(pWal->writeIdxTfd != -1) { @@ -211,6 +331,7 @@ int walRoll(SWal *pWal) { //switch file pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; + pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; //change status pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; @@ -218,32 +339,6 @@ int walRoll(SWal *pWal) { return 0; } -int walChangeFileToLast(SWal *pWal) { - int64_t idxTfd, logTfd; - WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); - ASSERT(pRet != NULL); - int64_t fileFirstVer = pRet->firstVer; - - char fnameStr[WAL_FILE_LEN]; - walBuildIdxName(pWal, fileFirstVer, fnameStr); - idxTfd = tfOpenReadWrite(fnameStr); - if(idxTfd < 0) { - return -1; - } - walBuildLogName(pWal, fileFirstVer, fnameStr); - logTfd = tfOpenReadWrite(fnameStr); - if(logTfd < 0) { - return -1; - } - //switch file - pWal->writeIdxTfd = idxTfd; - pWal->writeLogTfd = logTfd; - //change status - pWal->curVersion = fileFirstVer; - pWal->curStatus = WAL_CUR_FILE_WRITABLE; - return 0; -} - static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { int code = 0; //get index file @@ -253,9 +348,11 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); return code; } - int64_t writeBuf[2] = { ver, offset }; - int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf)); - if(size != sizeof(writeBuf)) { + char fnameStr[WAL_FILE_LEN]; + walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); + WalIdxEntry entry = { .ver = ver, .offset = offset }; + int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry)); + if(size != sizeof(WalIdxEntry)) { return -1; } return 0; @@ -270,13 +367,14 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i if (index == pWal->lastVersion + 1) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) { + pWal->firstVersion = index; code = walRoll(pWal); ASSERT(code == 0); } else { int64_t passed = walGetSeq() - pWal->lastRollSeq; - if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) { + if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) { walRoll(pWal); - } else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) { + } else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) { walRoll(pWal); } } @@ -287,16 +385,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } /*if (!tfValid(pWal->curLogTfd)) return 0;*/ - pWal->head.version = index; - - pWal->head.signature = WAL_SIGNATURE; - pWal->head.len = bodyLen; - pWal->head.msgType = msgType; - - pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2); - pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen); - pthread_mutex_lock(&pWal->mutex); + pWal->head.head.version = index; + + pWal->head.head.len = bodyLen; + pWal->head.head.msgType = msgType; + pWal->head.cksumHead = walCalcHeadCksum(&pWal->head); + pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen); if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate @@ -312,6 +407,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); if(code != 0) { //TODO + return -1; } //set status @@ -326,8 +422,6 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } void walFsync(SWal *pWal, bool forceFsync) { - if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return; - if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); if (tfFsync(pWal->writeLogTfd) < 0) { @@ -408,7 +502,7 @@ static int walValidateOffset(SWal* pWal, int64_t ver) { int code = 0; SWalHead *pHead = NULL; code = (int)walRead(pWal, &pHead, ver); - if(pHead->version != ver) { + if(pHead->head.version != ver) { return -1; } return 0; @@ -428,15 +522,6 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) { return 0; } -static void walFtruncate(SWal *pWal, int64_t ver) { - int64_t tfd = pWal->writeLogTfd; - tfFtruncate(tfd, ver); - tfFsync(tfd); - tfd = pWal->writeIdxTfd; - tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); - tfFsync(tfd); -} - #if 0 static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { int64_t pos = *offset; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 96258662a1..f244917f77 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -36,6 +36,36 @@ class WalCleanEnv : public ::testing::Test { const char* pathName = "/tmp/wal_test"; }; +class WalCleanDeleteEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->retentionPeriod = 0; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + class WalKeepEnv : public ::testing::Test { protected: static void SetUpTestCase() { @@ -110,40 +140,94 @@ TEST_F(WalCleanEnv, removeOldMeta) { ASSERT(code == 0); } -TEST_F(WalKeepEnv, readOldMeta) { - int code = walRollFileInfo(pWal); - ASSERT(code == 0); - code = walWriteMeta(pWal); - ASSERT(code == 0); - code = walRollFileInfo(pWal); - ASSERT(code == 0); - code = walWriteMeta(pWal); - ASSERT(code == 0); - char*oldss = walMetaSerialize(pWal); +//TEST_F(WalKeepEnv, readOldMeta) { + //int code = walRollFileInfo(pWal); + //ASSERT(code == 0); + //code = walWriteMeta(pWal); + //ASSERT(code == 0); + //code = walRollFileInfo(pWal); + //ASSERT(code == 0); + //code = walWriteMeta(pWal); + //ASSERT(code == 0); + //char*oldss = walMetaSerialize(pWal); - TearDown(); - SetUp(); - code = walReadMeta(pWal); - ASSERT(code == 0); - char* newss = walMetaSerialize(pWal); + //TearDown(); + //SetUp(); + //code = walReadMeta(pWal); + //ASSERT(code == 0); + //char* newss = walMetaSerialize(pWal); - int len = strlen(oldss); - ASSERT_EQ(len, strlen(newss)); - for(int i = 0; i < len; i++) { - EXPECT_EQ(oldss[i], newss[i]); - } -} + //int len = strlen(oldss); + //ASSERT_EQ(len, strlen(newss)); + //for(int i = 0; i < len; i++) { + //EXPECT_EQ(oldss[i], newss[i]); + //} +//} -TEST_F(WalKeepEnv, write) { +TEST_F(WalCleanEnv, write) { const char* ranStr = "tvapq02tcp"; const int len = strlen(ranStr); int code; for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->lastVersion, i); code = walWrite(pWal, i+2, i, (void*)ranStr, len); ASSERT_EQ(code, -1); + ASSERT_EQ(pWal->lastVersion, i); } code = walWriteMeta(pWal); ASSERT_EQ(code, 0); } + +TEST_F(WalCleanEnv, rollback) { + const char* ranStr = "tvapq02tcp"; + const int len = strlen(ranStr); + int code; + for(int i = 0; i < 10; i++) { + code = walWrite(pWal, i, i+1, (void*)ranStr, len); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->lastVersion, i); + } + code = walRollback(pWal, 5); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->lastVersion, 4); + code = walRollback(pWal, 3); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->lastVersion, 2); + code = walWriteMeta(pWal); + ASSERT_EQ(code, 0); +} + +TEST_F(WalCleanDeleteEnv, roll) { + const char* ranStr = "tvapq02tcp"; + const int len = strlen(ranStr); + int code; + int i; + for(i = 0; i < 100; i++) { + code = walWrite(pWal, i, 0, (void*)ranStr, len); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->lastVersion, i); + code = walCommit(pWal, i); + ASSERT_EQ(pWal->commitVersion, i); + } + + walBeginTakeSnapshot(pWal, i-1); + ASSERT_EQ(pWal->snapshottingVer, i-1); + walEndTakeSnapshot(pWal); + ASSERT_EQ(pWal->snapshotVersion, i-1); + ASSERT_EQ(pWal->snapshottingVer, -1); + + code = walWrite(pWal, 5, 0, (void*)ranStr, len); + ASSERT_NE(code, 0); + + for(; i < 200; i++) { + code = walWrite(pWal, i, 0, (void*)ranStr, len); + ASSERT_EQ(code, 0); + code = walCommit(pWal, i); + ASSERT_EQ(pWal->commitVersion, i); + } + + code = walWriteMeta(pWal); + ASSERT_EQ(code, 0); +} diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index ff52477a6f..2191419670 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -241,12 +241,16 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { assert(cnt <= pArray->size); pArray->size = pArray->size - cnt; if(pArray->size == 0) { - pArray->size = 0; return; } memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); } +void taosArrayPopTailBatch(SArray* pArray, size_t cnt) { + assert(cnt <= pArray->size); + pArray->size = pArray->size - cnt; +} + void taosArrayRemove(SArray* pArray, size_t index) { assert(index < pArray->size); @@ -329,6 +333,11 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags); } +int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) { + void* item = taosArraySearch(pArray, key, comparFn, flags); + return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize; +} + void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) { assert(pArray != NULL); qsort(pArray->pData, pArray->size, pArray->elemSize, comparFn);