diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a5d5316d23..5a2cf3a3a0 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -112,8 +112,8 @@ typedef struct SWal { int64_t totSize; int64_t lastRollSeq; // ctl - int64_t refId; - TdThreadMutex mutex; + int64_t refId; + TdThreadRwlock mutex; // ref SHashObj *pRefHash; // refId -> SWalRef // path diff --git a/include/util/tutil.h b/include/util/tutil.h index 6321d2011a..6a8f58e360 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -73,14 +73,14 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar char buf[TSDB_PASSWORD_LEN + 1]; buf[TSDB_PASSWORD_LEN] = 0; - (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], - context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], + context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], + context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); (void)memcpy(target, buf, TSDB_PASSWORD_LEN); } -static FORCE_INLINE int32_t taosHashBinary(char* pBuf, int32_t len) { +static FORCE_INLINE int32_t taosHashBinary(char *pBuf, int32_t len) { uint64_t hashVal = MurmurHash3_64(pBuf, len); return sprintf(pBuf, "%" PRIu64, hashVal); } @@ -161,8 +161,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) -#define TAOS_GET_TERRNO(code) \ - (terrno == 0 ? code : terrno) +#define TAOS_GET_TERRNO(code) (terrno == 0 ? code : terrno) #define TAOS_RETURN(CODE) \ do { \ @@ -177,6 +176,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) +#define TAOS_CHECK_RETURN_WITH_FREE(CMD, PTR) \ + do { \ + int32_t __c = (CMD); \ + if (__c != TSDB_CODE_SUCCESS) { \ + taosMemoryFree(PTR); \ + TAOS_RETURN(__c); \ + } \ + } while (0) + #define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ do { \ code = (CMD); \ diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8e709f8809..8081de60c9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3294,11 +3294,11 @@ _out:; // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append"); - if(pEntry != NULL) + if (pEntry != NULL) sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", - ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, - ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { (void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex); @@ -3449,8 +3449,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; - sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId, - DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); + sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, + ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) { raftStoreSetTerm(ths, pMsg->term); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 5f1ae02166..0dd55aacdb 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -684,7 +684,7 @@ _err: int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { int64_t ver = -1; int64_t totSize = 0; - (void)taosThreadMutexLock(&pWal->mutex); + (void)taosThreadRwlockRdlock(&pWal->mutex); int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet); while (--fileIdx) { SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); @@ -694,7 +694,7 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { } totSize += pInfo->fileSize; } - (void)taosThreadMutexUnlock(&pWal->mutex); + (void)taosThreadRwlockUnlock(&pWal->mutex); return ver + 1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index e1d31ce113..3bb4b9d747 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -21,11 +21,11 @@ #include "walInt.h" typedef struct { - int8_t stop; - int8_t inited; - uint32_t seq; - int32_t refSetId; - TdThread thread; + int8_t stop; + int8_t inited; + uint32_t seq; + int32_t refSetId; + TdThread thread; stopDnodeFn stopDnode; } SWalMgmt; @@ -81,6 +81,15 @@ void walCleanUp() { } } +static int32_t walInitLock(SWal *pWal) { + TdThreadRwlockAttr attr; + (void)taosThreadRwlockAttrInit(&attr); + (void)taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + (void)taosThreadRwlockInit(&pWal->mutex, &attr); + (void)taosThreadRwlockAttrDestroy(&attr); + return 0; +} + SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = taosMemoryCalloc(1, sizeof(SWal)); if (pWal == NULL) { @@ -88,7 +97,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { + if (walInitLock(pWal) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(pWal); return NULL; @@ -179,7 +188,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { _err: taosArrayDestroy(pWal->fileInfoSet); taosHashCleanup(pWal->pRefHash); - TAOS_UNUSED(taosThreadMutexDestroy(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockDestroy(&pWal->mutex)); taosMemoryFreeClear(pWal); return NULL; @@ -215,15 +224,15 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walPersist(SWal *pWal) { int32_t code = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); code = walSaveMeta(pWal); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } void walClose(SWal *pWal) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); (void)walSaveMeta(pWal); TAOS_UNUSED(taosCloseFile(&pWal->pLogFile)); pWal->pLogFile = NULL; @@ -243,7 +252,7 @@ void walClose(SWal *pWal) { } taosHashCleanup(pWal->pRefHash); pWal->pRefHash = NULL; - (void)taosThreadMutexUnlock(&pWal->mutex); + (void)taosThreadRwlockUnlock(&pWal->mutex); if (pWal->cfg.level == TAOS_WAL_SKIP) { wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path); @@ -258,7 +267,7 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); - (void)taosThreadMutexDestroy(&pWal->mutex); + (void)taosThreadRwlockDestroy(&pWal->mutex); taosMemoryFreeClear(pWal); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index aed7db327f..321a47d678 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -115,9 +115,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) { // if offset version is small than first version , let's seek to first version - TAOS_UNUSED(taosThreadMutexLock(&pWalReader->pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex)); int64_t firstVer = walGetFirstVer((pWalReader)->pWal); - TAOS_UNUSED(taosThreadMutexUnlock(&pWalReader->pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex)); if (pOffset->version < firstVer) { pOffset->version = firstVer; @@ -201,21 +201,29 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - if (pRet == NULL) { + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); + SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + if (gloablPRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); - + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } - + SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo)); + if (pRet == NULL) { + wError("failed to allocate memory for localRet"); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer)); + TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet); } // error code was set inner - TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver)); - + TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet); + taosMemoryFree(pRet); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 579921a7e0..bf24ed89fb 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -61,34 +61,34 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId); if (pRef->refVer != ver) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } pRef->refVer = ver; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); } TAOS_RETURN(TSDB_CODE_SUCCESS); } void walRefFirstVer(SWal *pWal, SWalRef *pRef) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); pRef->refVer = pWal->vers.firstVer; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer); } void walRefLastVer(SWal *pWal, SWalRef *pRef) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); pRef->refVer = pWal->vers.lastVer; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 224a7dc711..a5105fc107 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -23,7 +23,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t code = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, restore from snapshot, version %" PRId64, pWal->cfg.vgId, ver); @@ -34,7 +34,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer != -1 && pRef->refVer <= ver) { taosHashCancelIterate(pWal->pRefHash, pIter); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_FAILED); } @@ -51,7 +51,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); if (taosRemoveFile(fnameStr) < 0) { wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } @@ -60,7 +60,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); if (taosRemoveFile(fnameStr) < 0) { wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } @@ -81,7 +81,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pWal->vers.snapshotVer = ver; pWal->vers.verInSnapshotting = -1; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -160,12 +160,12 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) { } int32_t walRollback(SWal *pWal, int64_t ver) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); int64_t code; char fnameStr[WAL_FILE_LEN]; if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } @@ -175,7 +175,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // change current files code = walChangeWrite(pWal, ver); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -198,21 +198,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) { TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile)); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); if (pIdxFile == NULL) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } int64_t idxOff = walGetVerIdxOffset(pWal, ver); code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } // read idx file and get log file pos SWalIdxEntry entry; if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } @@ -223,14 +223,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) { wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { // TODO - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET); if (code < 0) { // TODO - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } @@ -238,19 +238,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) { SWalCkHead head; int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); if (size != sizeof(SWalCkHead)) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(terrno); } code = walValidHeadCksum(&head); if (code != 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } if (head.head.version != ver) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } @@ -258,13 +258,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // truncate old files code = taosFtruncateFile(pLogFile, entry.offset); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } code = taosFtruncateFile(pIdxFile, idxOff); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -278,13 +278,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) { code = walSaveMeta(pWal); if (code < 0) { wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } // unlock - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -375,7 +375,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { TAOS_RETURN(TSDB_CODE_FAILED); } - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); pWal->vers.verInSnapshotting = ver; pWal->vers.logRetention = logRetention; @@ -391,7 +391,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { } _exit: - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -399,7 +399,7 @@ _exit: int32_t walEndSnapshot(SWal *pWal) { int32_t code = 0, lino = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); int64_t ver = pWal->vers.verInSnapshotting; wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64 @@ -508,7 +508,7 @@ int32_t walEndSnapshot(SWal *pWal) { taosArrayClear(pWal->toDeleteFiles); _exit: - taosThreadMutexUnlock(&pWal->mutex); + taosThreadRwlockUnlock(&pWal->mutex); if (code) { wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code)); @@ -717,7 +717,7 @@ int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn int32_t bodyLen) { int32_t code = 0, lino = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (index != pWal->vers.lastVer + 1) { TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit); @@ -736,7 +736,7 @@ _exit: wError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); return code; } @@ -747,7 +747,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) { return code; } - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); if (taosFsyncFile(pWal->pLogFile) < 0) { @@ -756,7 +756,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) { code = terrno; } } - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); return code; }