diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a72765583e..f84297670b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -91,7 +91,8 @@ typedef struct SWal { int32_t fsyncPeriod; // millisecond int32_t rollPeriod; // second int64_t segSize; - int64_t rtSize; + int64_t retentionSize; + int32_t retentionPeriod; EWalType level; //total size int64_t totSize; @@ -99,31 +100,24 @@ typedef struct SWal { int32_t fsyncSeq; //reference int64_t refId; - //current tfd - int64_t curLogTfd; - int64_t curIdxTfd; + //write tfd + int64_t writeLogTfd; + int64_t writeIdxTfd; + //read tfd + int64_t readLogTfd; + int64_t readIdxTfd; //current version int64_t curVersion; - - //current file version - //int64_t curFileFirstVersion; - //int64_t curFileLastVersion; - //wal lifecycle int64_t firstVersion; int64_t snapshotVersion; int64_t commitVersion; int64_t lastVersion; - - //last file - //int64_t lastFileName; - //roll status int64_t lastRollSeq; - //int64_t lastFileWriteSize; - //file set - int32_t fileCursor; + int32_t writeCur; + int32_t readCur; SArray* fileInfoSet; //ctl int32_t curStatus; diff --git a/include/util/tarray.h b/include/util/tarray.h index 5807c980e0..25862a7119 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData); */ void taosArraySet(SArray* pArray, size_t index, void* pData); +/** + * remove some data entry from front + * @param pArray + * @param cnt + */ +void taosArrayPopFrontBatch(SArray* pArray, size_t cnt); + /** * remove data entry of the given index * @param pArray diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index ae655d61da..a8b2d2fc7f 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -49,26 +49,26 @@ static inline int64_t walGetLastFileFirstVer(SWal* pWal) { } static inline int64_t walGetCurFileFirstVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileLastVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileOffset(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->fileSize; } static inline bool walCurFileClosed(SWal* pWal) { - return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor; + return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; } static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { - return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); } static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index b4d66226d6..bf51be4346 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -23,25 +23,25 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int code = 0; - int64_t idxTfd = pWal->curIdxTfd; - int64_t logTfd = pWal->curLogTfd; + int64_t idxTfd = pWal->writeIdxTfd; + int64_t logTfd = pWal->writeLogTfd; //seek position int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { - + return -1; } int64_t readBuf[2]; code = tfRead(idxTfd, readBuf, sizeof(readBuf)); if(code != 0) { - + return -1; } //TODO:deserialize ASSERT(readBuf[0] == ver); code = tfLseek(logTfd, readBuf[1], SEEK_CUR); if (code != 0) { - + return -1; } /*pWal->curLogOffset = readBuf[1];*/ pWal->curVersion = ver; @@ -52,11 +52,11 @@ static int walChangeFile(SWal *pWal, int64_t ver) { int code = 0; int64_t idxTfd, logTfd; char fnameStr[WAL_FILE_LEN]; - code = tfClose(pWal->curLogTfd); + code = tfClose(pWal->writeLogTfd); if(code != 0) { //TODO } - code = tfClose(pWal->curIdxTfd); + code = tfClose(pWal->writeIdxTfd); if(code != 0) { //TODO } @@ -81,14 +81,14 @@ static int walChangeFile(SWal *pWal, int64_t ver) { logTfd = tfOpenReadWrite(fnameStr); } - pWal->curLogTfd = logTfd; - pWal->curIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; return code; } int walSeekVer(SWal *pWal, int64_t ver) { - if((!(pWal->curStatus & WAL_CUR_FAILED)) - && ver == pWal->curVersion) { + int code; + if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) { return 0; } if(ver > pWal->lastVersion) { @@ -103,9 +103,15 @@ int walSeekVer(SWal *pWal, int64_t ver) { //TODO: seek snapshotted log, invalid in some cases } if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { - walChangeFile(pWal, ver); + code = walChangeFile(pWal, ver); + if(code != 0) { + return -1; + } } - walSeekFilePos(pWal, ver); - + code = walSeekFilePos(pWal, ver); + if(code != 0) { + return -1; + } + return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index acb173b17b..4104e0e6ce 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -48,9 +48,15 @@ int32_t walInit() { int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); if(old == 1) return 0; + int code = tfInit(); + if(code != 0) { + wError("failed to init tfile since %s", tstrerror(code)); + atomic_store_8(&tsWal.inited, 0); + return code; + } tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - int code = walCreateThread(); + code = walCreateThread(); if (code != 0) { wError("failed to init wal module since %s", tstrerror(code)); atomic_store_8(&tsWal.inited, 0); @@ -74,8 +80,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - pWal->curLogTfd = -1; - pWal->curIdxTfd = -1; + pWal->writeLogTfd = -1; + pWal->writeIdxTfd = -1; //set config pWal->vgId = pCfg->vgId; @@ -138,8 +144,8 @@ void walClose(SWal *pWal) { if (pWal == NULL) return; pthread_mutex_lock(&pWal->mutex); - tfClose(pWal->curLogTfd); - tfClose(pWal->curIdxTfd); + tfClose(pWal->writeLogTfd); + tfClose(pWal->writeIdxTfd); /*taosArrayDestroy(pWal->fileInfoSet);*/ /*pWal->fileInfoSet = NULL;*/ pthread_mutex_unlock(&pWal->mutex); @@ -165,8 +171,8 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); - tfClose(pWal->curLogTfd); - tfClose(pWal->curIdxTfd); + tfClose(pWal->writeLogTfd); + tfClose(pWal->writeIdxTfd); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; taosArrayDestroy(pWal->fileInfoSet); @@ -197,7 +203,7 @@ static void walFsyncAll() { while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); - int32_t code = tfFsync(pWal->curLogTfd); + int32_t code = tfFsync(pWal->writeLogTfd); if (code != 0) { wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 90ec5528c4..e9f5bcbc5d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -42,7 +42,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { } *ppHead = ptr; } - if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { + if(tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { return -1; } //TODO: endian compatibility processing after read @@ -55,7 +55,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { *ppHead = NULL; return -1; } - if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { + if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { return -1; } //TODO: endian compatibility processing after read diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 3c65698938..0c4989300f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -44,18 +44,39 @@ int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { pWal->snapshotVersion = ver; + int ts = taosGetTimestampSec(); + int deleteCnt = 0; + int64_t newTotSize = pWal->totSize; WalFileInfo tmp; tmp.firstVer = ver; //mark files safe to delete WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); //iterate files, until the searched result - //if totSize > rtSize, delete - //if createTs > retentionTs, delete + for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { + if(pWal->totSize > pWal->retentionSize || + iter->closeTs + pWal->retentionPeriod > ts) { + //delete according to file size or close time + deleteCnt++; + newTotSize -= iter->fileSize; + } + } + char fnameStr[WAL_FILE_LEN]; + //remove file + for(int i = 0; i < deleteCnt; i++) { + WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + } //save snapshot ver, commit ver + //make new array, remove files + taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); + pWal->totSize = newTotSize; return 0; } @@ -153,14 +174,14 @@ void walRemoveAllOldFiles(void *handle) { int walRoll(SWal *pWal) { int code = 0; - if(pWal->curIdxTfd != -1) { - code = tfClose(pWal->curIdxTfd); + if(pWal->writeIdxTfd != -1) { + code = tfClose(pWal->writeIdxTfd); if(code != 0) { return -1; } } - if(pWal->curLogTfd != -1) { - code = tfClose(pWal->curLogTfd); + if(pWal->writeLogTfd != -1) { + code = tfClose(pWal->writeLogTfd); if(code != 0) { return -1; } @@ -188,8 +209,8 @@ int walRoll(SWal *pWal) { } //switch file - pWal->curIdxTfd = idxTfd; - pWal->curLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; //change status pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; @@ -215,8 +236,8 @@ int walChangeFileToLast(SWal *pWal) { return -1; } //switch file - pWal->curIdxTfd = idxTfd; - pWal->curLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; //change status pWal->curVersion = fileFirstVer; pWal->curStatus = WAL_CUR_FILE_WRITABLE; @@ -226,15 +247,14 @@ int walChangeFileToLast(SWal *pWal) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { int code = 0; //get index file - if(!tfValid(pWal->curIdxTfd)) { + if(!tfValid(pWal->writeIdxTfd)) { code = TAOS_SYSTEM_ERROR(errno); - WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno)); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); return code; } int64_t writeBuf[2] = { ver, offset }; - int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); + int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf)); if(size != sizeof(writeBuf)) { return -1; } @@ -278,13 +298,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i pthread_mutex_lock(&pWal->mutex); - if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } - if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { + if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -296,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i //set status pWal->lastVersion = index; + pWal->totSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; @@ -305,11 +326,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } void walFsync(SWal *pWal, bool forceFsync) { - if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; + if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); - if (tfFsync(pWal->curLogTfd) < 0) { + if (tfFsync(pWal->writeLogTfd) < 0) { wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } @@ -408,10 +429,10 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) { } static void walFtruncate(SWal *pWal, int64_t ver) { - int64_t tfd = pWal->curLogTfd; + int64_t tfd = pWal->writeLogTfd; tfFtruncate(tfd, ver); tfFsync(tfd); - tfd = pWal->curIdxTfd; + tfd = pWal->writeIdxTfd; tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); tfFsync(tfd); } diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 4c0533d389..46328bb626 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -3,7 +3,6 @@ #include #include -#include "tfile.h" #include "walInt.h" class WalCleanEnv : public ::testing::Test { @@ -11,13 +10,10 @@ class WalCleanEnv : public ::testing::Test { static void SetUpTestCase() { int code = walInit(); ASSERT(code == 0); - code = tfInit(); - ASSERT(code == 0); } static void TearDownTestCase() { walCleanUp(); - tfCleanup(); } void SetUp() override { @@ -45,13 +41,10 @@ class WalKeepEnv : public ::testing::Test { static void SetUpTestCase() { int code = walInit(); ASSERT(code == 0); - code = tfInit(); - ASSERT(code == 0); } static void TearDownTestCase() { walCleanUp(); - tfCleanup(); } void SetUp() override { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 007ce06829..ff52477a6f 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) { memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize); } +void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { + assert(cnt <= pArray->size); + pArray->size = pArray->size - cnt; + if(pArray->size == 0) { + pArray->size = 0; + return; + } + memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); +} + void taosArrayRemove(SArray* pArray, size_t index) { assert(index < pArray->size); diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 5d4789aae6..313f1d97af 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -22,20 +22,26 @@ static int32_t tsFileRsetId = -1; +static int8_t tfInited = 0; + static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); } int32_t tfInit() { + int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1); + if(old == 1) return 0; tsFileRsetId = taosOpenRef(2000, tfCloseFile); if (tsFileRsetId > 0) { return 0; } else { + atomic_store_8(&tfInited, 0); return -1; } } void tfCleanup() { + atomic_store_8(&tfInited, 0); if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); tsFileRsetId = -1; }