From 43a2bcdce327a59db9895d707911d77573e8e5d9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 14 Dec 2021 14:38:52 +0800 Subject: [PATCH] fix wal read handle --- include/libs/wal/wal.h | 58 +++++++++++------------- source/libs/wal/inc/walInt.h | 6 ++- source/libs/wal/src/walMgmt.c | 3 +- source/libs/wal/src/walRead.c | 52 +++++++++++++-------- source/libs/wal/src/walWrite.c | 7 +-- source/libs/wal/test/walMetaTest.cpp | 67 ++++++++++++++++++++-------- 6 files changed, 118 insertions(+), 75 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index ae1e630c6f..744275e6ff 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -32,6 +32,23 @@ extern int32_t wDebugFlag; #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} +#define WAL_PREFIX "wal" +#define WAL_PREFIX_LEN 3 +#define WAL_NOSUFFIX_LEN 20 +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) +#define WAL_LOG_SUFFIX "log" +#define WAL_INDEX_SUFFIX "idx" +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) + +#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) +#define WAL_CUR_POS_WRITABLE 1 +#define WAL_CUR_FILE_WRITABLE 2 +#define WAL_CUR_FAILED 4 + +#pragma pack(push,1) typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, @@ -43,6 +60,7 @@ typedef struct SWalReadHead { uint8_t msgType; int8_t reserved[2]; int32_t len; + //int64_t ingestTs; //not implemented int64_t version; char body[]; } SWalReadHead; @@ -71,25 +89,6 @@ typedef struct { SWalReadHead head; } SWalHead; -#define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 -#define WAL_NOSUFFIX_LEN 20 -#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) -#define WAL_LOG_SUFFIX "log" -#define WAL_INDEX_SUFFIX "idx" -#define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) -#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL)) -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -//#define WAL_FILE_NUM 1 // 3 -#define WAL_FILESET_MAX 128 - -#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) -#define WAL_CUR_POS_WRITABLE 1 -#define WAL_CUR_FILE_WRITABLE 2 -#define WAL_CUR_FAILED 4 - typedef struct SWalVer { int64_t firstVer; int64_t verInSnapshotting; @@ -101,24 +100,18 @@ typedef struct SWalVer { typedef struct SWal { // cfg SWalCfg cfg; - //total size - int64_t totSize; - //fsync seq - int32_t fsyncSeq; - //reference - int64_t refId; - //write tfd - int64_t writeLogTfd; - int64_t writeIdxTfd; - //wal lifecycle SWalVer vers; - //roll status - int64_t lastRollSeq; //file set int32_t writeCur; + int64_t writeLogTfd; + int64_t writeIdxTfd; SArray* fileInfoSet; //ctl int32_t curStatus; + int32_t fsyncSeq; + int64_t totSize; + int64_t refId; + int64_t lastRollSeq; pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; @@ -134,8 +127,9 @@ typedef struct SWalReadHandle { int64_t curVersion; int64_t capacity; int64_t status; //if cursor valid - SWalHead head; + SWalHead* pHead; } SWalReadHandle; +#pragma pack(pop) typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index ec01f7d7fc..e546a87326 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -33,10 +33,12 @@ typedef struct WalFileInfo { int64_t fileSize; } WalFileInfo; +#pragma pack(push,1) typedef struct WalIdxEntry { int64_t ver; int64_t offset; } WalIdxEntry; +#pragma pack(pop) static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; @@ -78,11 +80,11 @@ static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { } static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { - return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); + return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); } static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { - return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); + return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); } static inline int walValidHeadCksum(SWalHead* pHead) { diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 7c100b4883..629451a722 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -255,9 +255,8 @@ static int32_t walCreateThread() { static void walStopThread() { atomic_store_8(&tsWal.stop, 1); - if (tsWal.thread != NULL && taosCheckPthreadValid(tsWal.thread)) { + if (taosCheckPthreadValid(tsWal.thread)) { pthread_join(tsWal.thread, NULL); - tsWal.thread = NULL; } wDebug("wal thread is stopped"); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 554a5c846b..b6aafedea3 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -21,16 +21,25 @@ SWalReadHandle* walOpenReadHandle(SWal* pWal) { if(pRead == NULL) { return NULL; } - memset(pRead, 0, sizeof(SWalReadHandle)); pRead->pWal = pWal; pRead->readIdxTfd = -1; pRead->readLogTfd = -1; - return NULL; + pRead->curVersion = -1; + pRead->curFileFirstVer = -1; + pRead->capacity = 0; + pRead->status = 0; + pRead->pHead = malloc(sizeof(SWalHead)); + if(pRead->pHead == NULL) { + free(pRead); + return NULL; + } + return pRead; } void walCloseReadHandle(SWalReadHandle *pRead) { tfClose(pRead->readIdxTfd); tfClose(pRead->readLogTfd); + tfree(pRead->pHead); free(pRead); } @@ -47,18 +56,17 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i //seek position int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE; code = tfLseek(idxTfd, offset, SEEK_SET); - if(code != 0) { + if(code < 0) { return -1; } WalIdxEntry entry; - code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry)); - if(code != 0) { + if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) { return -1; } //TODO:deserialize ASSERT(entry.ver == ver); code = tfLseek(logTfd, entry.offset, SEEK_SET); - if (code != 0) { + if (code < 0) { return -1; } return code; @@ -71,13 +79,13 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { tfClose(pRead->readLogTfd); walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); - int logTfd = tfOpenRead(fnameStr); + int64_t logTfd = tfOpenRead(fnameStr); if(logTfd < 0) { return -1; } walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); - int idxTfd = tfOpenRead(fnameStr); + int64_t idxTfd = tfOpenRead(fnameStr); if(idxTfd < 0) { return -1; } @@ -90,7 +98,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { int code; SWal *pWal = pRead->pWal; - if(ver == pWal->vers.lastVer) { + if(ver == pRead->curVersion) { return 0; } if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { @@ -126,33 +134,41 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int code; //TODO: check wal life if(pRead->curVersion != ver) { - walReadSeekVer(pRead, ver); + code = walReadSeekVer(pRead, ver); + if(code != 0) { + return -1; + } } if(!tfValid(pRead->readLogTfd)) return -1; - if(sizeof(SWalHead) != tfRead(pRead->readLogTfd, &pRead->head, sizeof(SWalHead))) { + code = tfRead(pRead->readLogTfd, pRead->pHead, sizeof(SWalHead)); + if(code != sizeof(SWalHead)) { return -1; } - code = walValidHeadCksum(&pRead->head); + code = walValidHeadCksum(pRead->pHead); if(code != 0) { return -1; } - if(pRead->capacity < pRead->head.head.len) { - void* ptr = realloc(pRead, pRead->head.head.len); + if(pRead->capacity < pRead->pHead->head.len) { + void* ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); if(ptr == NULL) { return -1; } - pRead = ptr; - pRead->capacity = pRead->head.head.len; + pRead->pHead = ptr; + pRead->capacity = pRead->pHead->head.len; } - if(pRead->head.head.len != tfRead(pRead->readLogTfd, &pRead->head.head.body, pRead->head.head.len)) { + if(pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) { return -1; } - code = walValidBodyCksum(&pRead->head); + + /*code = walValidBodyCksum(pRead->pHead);*/ + ASSERT(pRead->pHead->head.version == ver); + if(code != 0) { return -1; } + pRead->curVersion++; return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 44e8cec153..994b8fc333 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -377,11 +377,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i //must truncate explicitly first return -1; } - /*if (!tfValid(pWal->curLogTfd)) return 0;*/ + /*if (!tfValid(pWal->writeLogTfd)) return -1;*/ pthread_mutex_lock(&pWal->mutex); pWal->writeHead.head.version = index; + int64_t offset = walGetCurFileOffset(pWal); pWal->writeHead.head.len = bodyLen; pWal->writeHead.head.msgType = msgType; pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); @@ -393,12 +394,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } - if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { + if (tfWrite(pWal->writeLogTfd, (char*)body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } - code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); + code = walWriteIndex(pWal, index, offset); if(code != 0) { //TODO return -1; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 504f1ada3f..200bf39c5a 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -5,6 +5,9 @@ #include "walInt.h" +const char* ranStr = "tvapq02tcp"; +const int ranStrLen = strlen(ranStr); + class WalCleanEnv : public ::testing::Test { protected: static void SetUpTestCase() { @@ -157,15 +160,13 @@ TEST_F(WalCleanEnv, removeOldMeta) { TEST_F(WalKeepEnv, readOldMeta) { walResetEnv(); - const char* ranStr = "tvapq02tcp"; - int len = strlen(ranStr); int code; for(int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i+1, (void*)ranStr, len); + code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); - code = walWrite(pWal, i+2, i, (void*)ranStr, len); + code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen); ASSERT_EQ(code, -1); ASSERT_EQ(pWal->vers.lastVer, i); } @@ -179,7 +180,7 @@ TEST_F(WalKeepEnv, readOldMeta) { char* newss = walMetaSerialize(pWal); - len = strlen(oldss); + int len = strlen(oldss); ASSERT_EQ(len, strlen(newss)); for(int i = 0; i < len; i++) { EXPECT_EQ(oldss[i], newss[i]); @@ -189,14 +190,12 @@ TEST_F(WalKeepEnv, readOldMeta) { } TEST_F(WalCleanEnv, write) { - const char* ranStr = "tvapq02tcp"; - const int len = strlen(ranStr); int code; for(int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i+1, (void*)ranStr, len); + code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); - code = walWrite(pWal, i+2, i, (void*)ranStr, len); + code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen); ASSERT_EQ(code, -1); ASSERT_EQ(pWal->vers.lastVer, i); } @@ -205,11 +204,9 @@ TEST_F(WalCleanEnv, write) { } TEST_F(WalCleanEnv, rollback) { - const char* ranStr = "tvapq02tcp"; - const int len = strlen(ranStr); int code; for(int i = 0; i < 10; i++) { - code = walWrite(pWal, i, i+1, (void*)ranStr, len); + code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); } @@ -224,12 +221,10 @@ TEST_F(WalCleanEnv, rollback) { } TEST_F(WalCleanDeleteEnv, roll) { - const char* ranStr = "tvapq02tcp"; - const int len = strlen(ranStr); int code; int i; for(i = 0; i < 100; i++) { - code = walWrite(pWal, i, 0, (void*)ranStr, len); + code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); code = walCommit(pWal, i); @@ -242,19 +237,55 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.snapshotVer, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1); - code = walWrite(pWal, 5, 0, (void*)ranStr, len); + code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen); ASSERT_NE(code, 0); for(; i < 200; i++) { - code = walWrite(pWal, i, 0, (void*)ranStr, len); + code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); code = walCommit(pWal, i); ASSERT_EQ(pWal->vers.commitVer, i); } - //code = walWriteMeta(pWal); code = walBeginTakeSnapshot(pWal, i - 1); ASSERT_EQ(code, 0); code = walEndTakeSnapshot(pWal); ASSERT_EQ(code, 0); } + +TEST_F(WalKeepEnv, readHandleRead) { + walResetEnv(); + int code; + SWalReadHandle* pRead = walOpenReadHandle(pWal); + ASSERT(pRead != NULL); + + int i ; + for(i = 0; i < 100; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walWrite(pWal, i, 0, newStr, len); + ASSERT_EQ(code, 0); + } + for(int i = 0; i < 1000; i++) { + int ver = rand() % 100; + code = walReadWithHandle(pRead, ver); + ASSERT_EQ(code, 0); + + //printf("rrbody: \n"); + //for(int i = 0; i < pRead->pHead->head.len; i++) { + //printf("%d ", pRead->pHead->head.body[i]); + //} + //printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver+1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.len, len); + for(int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } +}