From ca2e761a86b78e6e7595ce15966f20847903ce42 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 11 Sep 2024 10:24:04 +0800 Subject: [PATCH 1/5] Add mutex while read wal --- include/util/tutil.h | 22 +++++++++++++++------- source/libs/wal/src/walRead.c | 8 +++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index 6321d2011a..5fa06b7e61 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -73,14 +73,14 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar char buf[TSDB_PASSWORD_LEN + 1]; buf[TSDB_PASSWORD_LEN] = 0; - (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], - context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], + context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], + context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); (void)memcpy(target, buf, TSDB_PASSWORD_LEN); } -static FORCE_INLINE int32_t taosHashBinary(char* pBuf, int32_t len) { +static FORCE_INLINE int32_t taosHashBinary(char *pBuf, int32_t len) { uint64_t hashVal = MurmurHash3_64(pBuf, len); return sprintf(pBuf, "%" PRIu64, hashVal); } @@ -161,8 +161,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) -#define TAOS_GET_TERRNO(code) \ - (terrno == 0 ? code : terrno) +#define TAOS_GET_TERRNO(code) (terrno == 0 ? code : terrno) #define TAOS_RETURN(CODE) \ do { \ @@ -177,6 +176,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) +#define TAOS_CHECK_RETURN_WITH_MUTEX(CMD, MUTEX) \ + do { \ + int32_t __c = (CMD); \ + if (__c != TSDB_CODE_SUCCESS) { \ + taosThreadMutexUnlock(MUTEX); \ + TAOS_RETURN(__c); \ + } \ + } while (0) + #define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ do { \ code = (CMD); \ diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index deb5a07672..5568270efe 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -201,21 +201,23 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; + taosThreadMutexLock(&pWal->mutex); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); if (pRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); - + taosThreadMutexUnlock(&pWal->mutex); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer)); + TAOS_CHECK_RETURN_WITH_MUTEX(walReadChangeFile(pReader, pRet->firstVer), &pWal->mutex); } // error code was set inner - TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver)); + TAOS_CHECK_RETURN_WITH_MUTEX(walReadSeekFilePos(pReader, pRet->firstVer, ver), &pWal->mutex); + taosThreadMutexUnlock(&pWal->mutex); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; From 96c59cc7462f0c57313cd08ac074ff3e67b27d54 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 11 Sep 2024 13:47:43 +0800 Subject: [PATCH 2/5] reduce mutex lock usage time --- include/util/tutil.h | 14 +++++++------- source/libs/wal/src/walRead.c | 20 +++++++++++++------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index 5fa06b7e61..6a8f58e360 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -176,13 +176,13 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) -#define TAOS_CHECK_RETURN_WITH_MUTEX(CMD, MUTEX) \ - do { \ - int32_t __c = (CMD); \ - if (__c != TSDB_CODE_SUCCESS) { \ - taosThreadMutexUnlock(MUTEX); \ - TAOS_RETURN(__c); \ - } \ +#define TAOS_CHECK_RETURN_WITH_FREE(CMD, PTR) \ + do { \ + int32_t __c = (CMD); \ + if (__c != TSDB_CODE_SUCCESS) { \ + taosMemoryFree(PTR); \ + TAOS_RETURN(__c); \ + } \ } while (0) #define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 5568270efe..1e9fcc70aa 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -202,22 +202,28 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; taosThreadMutexLock(&pWal->mutex); - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - if (pRet == NULL) { + SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + if (gloablPRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); taosThreadMutexUnlock(&pWal->mutex); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } - + SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo)); + if (pRet == NULL) { + wError("failed to allocate memory for localRet"); + taosThreadMutexUnlock(&pWal->mutex); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo)); + taosThreadMutexUnlock(&pWal->mutex); if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - TAOS_CHECK_RETURN_WITH_MUTEX(walReadChangeFile(pReader, pRet->firstVer), &pWal->mutex); + TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet); } // error code was set inner - TAOS_CHECK_RETURN_WITH_MUTEX(walReadSeekFilePos(pReader, pRet->firstVer, ver), &pWal->mutex); - - taosThreadMutexUnlock(&pWal->mutex); + TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet); + taosMemoryFree(pRet); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; From b49d9f369199fb69ce2028b6717b96e940bd2e4b Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 11 Sep 2024 19:48:50 +0800 Subject: [PATCH 3/5] change mutex lock in SWal to rwlock --- include/libs/wal/wal.h | 4 +-- source/libs/sync/src/syncMain.c | 12 +++---- source/libs/wal/src/walMeta.c | 4 +-- source/libs/wal/src/walMgmt.c | 24 +++++++------- source/libs/wal/src/walRead.c | 12 +++---- source/libs/wal/src/walRef.c | 14 ++++----- source/libs/wal/src/walWrite.c | 56 ++++++++++++++++----------------- 7 files changed, 63 insertions(+), 63 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a5d5316d23..5a2cf3a3a0 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -112,8 +112,8 @@ typedef struct SWal { int64_t totSize; int64_t lastRollSeq; // ctl - int64_t refId; - TdThreadMutex mutex; + int64_t refId; + TdThreadRwlock mutex; // ref SHashObj *pRefHash; // refId -> SWalRef // path diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8e709f8809..8081de60c9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3294,11 +3294,11 @@ _out:; // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append"); - if(pEntry != NULL) + if (pEntry != NULL) sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", - ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, - ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { (void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex); @@ -3449,8 +3449,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; - sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId, - DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); + sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, + ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) { raftStoreSetTerm(ths, pMsg->term); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 0eb011113a..925524affb 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -684,7 +684,7 @@ _err: int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { int64_t ver = -1; int64_t totSize = 0; - (void)taosThreadMutexLock(&pWal->mutex); + (void)taosThreadRwlockRdlock(&pWal->mutex); int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet); while (--fileIdx) { SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); @@ -694,7 +694,7 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { } totSize += pInfo->fileSize; } - (void)taosThreadMutexUnlock(&pWal->mutex); + (void)taosThreadRwlockUnlock(&pWal->mutex); return ver + 1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index e1d31ce113..31085e7919 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -21,11 +21,11 @@ #include "walInt.h" typedef struct { - int8_t stop; - int8_t inited; - uint32_t seq; - int32_t refSetId; - TdThread thread; + int8_t stop; + int8_t inited; + uint32_t seq; + int32_t refSetId; + TdThread thread; stopDnodeFn stopDnode; } SWalMgmt; @@ -88,7 +88,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { + if (taosThreadRwlockInit(&pWal->mutex, NULL) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(pWal); return NULL; @@ -179,7 +179,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { _err: taosArrayDestroy(pWal->fileInfoSet); taosHashCleanup(pWal->pRefHash); - TAOS_UNUSED(taosThreadMutexDestroy(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockDestroy(&pWal->mutex)); taosMemoryFreeClear(pWal); return NULL; @@ -215,15 +215,15 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walPersist(SWal *pWal) { int32_t code = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); code = walSaveMeta(pWal); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } void walClose(SWal *pWal) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); (void)walSaveMeta(pWal); TAOS_UNUSED(taosCloseFile(&pWal->pLogFile)); pWal->pLogFile = NULL; @@ -243,7 +243,7 @@ void walClose(SWal *pWal) { } taosHashCleanup(pWal->pRefHash); pWal->pRefHash = NULL; - (void)taosThreadMutexUnlock(&pWal->mutex); + (void)taosThreadRwlockUnlock(&pWal->mutex); if (pWal->cfg.level == TAOS_WAL_SKIP) { wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path); @@ -258,7 +258,7 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); - (void)taosThreadMutexDestroy(&pWal->mutex); + (void)taosThreadRwlockDestroy(&pWal->mutex); taosMemoryFreeClear(pWal); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 1e9fcc70aa..e44e7038d0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -115,9 +115,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) { // if offset version is small than first version , let's seek to first version - TAOS_UNUSED(taosThreadMutexLock(&pWalReader->pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex)); int64_t firstVer = walGetFirstVer((pWalReader)->pWal); - TAOS_UNUSED(taosThreadMutexUnlock(&pWalReader->pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex)); if (pOffset->version < firstVer) { pOffset->version = firstVer; @@ -201,21 +201,21 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; - taosThreadMutexLock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); if (gloablPRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); - taosThreadMutexUnlock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo)); if (pRet == NULL) { wError("failed to allocate memory for localRet"); - taosThreadMutexUnlock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo)); - taosThreadMutexUnlock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet); diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 579921a7e0..bf24ed89fb 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -61,34 +61,34 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId); if (pRef->refVer != ver) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } pRef->refVer = ver; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); } TAOS_RETURN(TSDB_CODE_SUCCESS); } void walRefFirstVer(SWal *pWal, SWalRef *pRef) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); pRef->refVer = pWal->vers.firstVer; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer); } void walRefLastVer(SWal *pWal, SWalRef *pRef) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); pRef->refVer = pWal->vers.lastVer; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 52af3e8528..b3c5f30ba3 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -23,7 +23,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t code = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, restore from snapshot, version %" PRId64, pWal->cfg.vgId, ver); @@ -34,7 +34,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer != -1 && pRef->refVer <= ver) { taosHashCancelIterate(pWal->pRefHash, pIter); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_FAILED); } @@ -51,7 +51,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); if (taosRemoveFile(fnameStr) < 0) { wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -60,7 +60,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); if (taosRemoveFile(fnameStr) < 0) { wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -81,7 +81,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pWal->vers.snapshotVer = ver; pWal->vers.verInSnapshotting = -1; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -160,12 +160,12 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) { } int32_t walRollback(SWal *pWal, int64_t ver) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); int64_t code; char fnameStr[WAL_FILE_LEN]; if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } @@ -175,7 +175,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // change current files code = walChangeWrite(pWal, ver); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -198,21 +198,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) { TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile)); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); if (pIdxFile == NULL) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } int64_t idxOff = walGetVerIdxOffset(pWal, ver); code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } // read idx file and get log file pos SWalIdxEntry entry; if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -223,14 +223,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) { wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { // TODO - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET); if (code < 0) { // TODO - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -238,19 +238,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) { SWalCkHead head; int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); if (size != sizeof(SWalCkHead)) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } code = walValidHeadCksum(&head); if (code != 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } if (head.head.version != ver) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } @@ -258,13 +258,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // truncate old files code = taosFtruncateFile(pLogFile, entry.offset); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } code = taosFtruncateFile(pIdxFile, idxOff); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -278,13 +278,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) { code = walSaveMeta(pWal); if (code < 0) { wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } // unlock - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -375,7 +375,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { TAOS_RETURN(TSDB_CODE_FAILED); } - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); pWal->vers.verInSnapshotting = ver; pWal->vers.logRetention = logRetention; @@ -391,7 +391,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { } _exit: - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -399,7 +399,7 @@ _exit: int32_t walEndSnapshot(SWal *pWal) { int32_t code = 0, lino = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); int64_t ver = pWal->vers.verInSnapshotting; wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64 @@ -508,7 +508,7 @@ int32_t walEndSnapshot(SWal *pWal) { taosArrayClear(pWal->toDeleteFiles); _exit: - taosThreadMutexUnlock(&pWal->mutex); + taosThreadRwlockWrlock(&pWal->mutex); if (code) { wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code)); @@ -719,7 +719,7 @@ int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn int32_t bodyLen) { int32_t code = 0, lino = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (index != pWal->vers.lastVer + 1) { TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit); @@ -738,7 +738,7 @@ _exit: wError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); return code; } @@ -749,7 +749,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) { return code; } - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); if (taosFsyncFile(pWal->pLogFile) < 0) { @@ -758,7 +758,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) { code = TAOS_SYSTEM_ERROR(errno); } } - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); return code; } From 12f9e268a676255c7efca006738858959c1d3c60 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Sat, 14 Sep 2024 11:10:57 +0800 Subject: [PATCH 4/5] fix dead lock --- source/libs/wal/src/walWrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index b3c5f30ba3..6503ffdd02 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -508,7 +508,7 @@ int32_t walEndSnapshot(SWal *pWal) { taosArrayClear(pWal->toDeleteFiles); _exit: - taosThreadRwlockWrlock(&pWal->mutex); + taosThreadRwlockUnlock(&pWal->mutex); if (code) { wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code)); From 0a19bc974bd0ed13e8f49824ec11236d374c946f Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Sat, 14 Sep 2024 15:59:01 +0800 Subject: [PATCH 5/5] set wr lock to write priority --- source/libs/wal/src/walMgmt.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 31085e7919..3bb4b9d747 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -81,6 +81,15 @@ void walCleanUp() { } } +static int32_t walInitLock(SWal *pWal) { + TdThreadRwlockAttr attr; + (void)taosThreadRwlockAttrInit(&attr); + (void)taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + (void)taosThreadRwlockInit(&pWal->mutex, &attr); + (void)taosThreadRwlockAttrDestroy(&attr); + return 0; +} + SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = taosMemoryCalloc(1, sizeof(SWal)); if (pWal == NULL) { @@ -88,7 +97,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if (taosThreadRwlockInit(&pWal->mutex, NULL) < 0) { + if (walInitLock(pWal) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(pWal); return NULL;