diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a5d5316d23..5a2cf3a3a0 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -112,8 +112,8 @@ typedef struct SWal { int64_t totSize; int64_t lastRollSeq; // ctl - int64_t refId; - TdThreadMutex mutex; + int64_t refId; + TdThreadRwlock mutex; // ref SHashObj *pRefHash; // refId -> SWalRef // path diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8e709f8809..8081de60c9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3294,11 +3294,11 @@ _out:; // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append"); - if(pEntry != NULL) + if (pEntry != NULL) sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", - ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, - ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { (void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex); @@ -3449,8 +3449,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; - sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId, - DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); + sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, + ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) { raftStoreSetTerm(ths, pMsg->term); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 0eb011113a..925524affb 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -684,7 +684,7 @@ _err: int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { int64_t ver = -1; int64_t totSize = 0; - (void)taosThreadMutexLock(&pWal->mutex); + (void)taosThreadRwlockRdlock(&pWal->mutex); int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet); while (--fileIdx) { SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); @@ -694,7 +694,7 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { } totSize += pInfo->fileSize; } - (void)taosThreadMutexUnlock(&pWal->mutex); + (void)taosThreadRwlockUnlock(&pWal->mutex); return ver + 1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index e1d31ce113..31085e7919 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -21,11 +21,11 @@ #include "walInt.h" typedef struct { - int8_t stop; - int8_t inited; - uint32_t seq; - int32_t refSetId; - TdThread thread; + int8_t stop; + int8_t inited; + uint32_t seq; + int32_t refSetId; + TdThread thread; stopDnodeFn stopDnode; } SWalMgmt; @@ -88,7 +88,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { + if (taosThreadRwlockInit(&pWal->mutex, NULL) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(pWal); return NULL; @@ -179,7 +179,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { _err: taosArrayDestroy(pWal->fileInfoSet); taosHashCleanup(pWal->pRefHash); - TAOS_UNUSED(taosThreadMutexDestroy(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockDestroy(&pWal->mutex)); taosMemoryFreeClear(pWal); return NULL; @@ -215,15 +215,15 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walPersist(SWal *pWal) { int32_t code = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); code = walSaveMeta(pWal); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } void walClose(SWal *pWal) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); (void)walSaveMeta(pWal); TAOS_UNUSED(taosCloseFile(&pWal->pLogFile)); pWal->pLogFile = NULL; @@ -243,7 +243,7 @@ void walClose(SWal *pWal) { } taosHashCleanup(pWal->pRefHash); pWal->pRefHash = NULL; - (void)taosThreadMutexUnlock(&pWal->mutex); + (void)taosThreadRwlockUnlock(&pWal->mutex); if (pWal->cfg.level == TAOS_WAL_SKIP) { wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path); @@ -258,7 +258,7 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); - (void)taosThreadMutexDestroy(&pWal->mutex); + (void)taosThreadRwlockDestroy(&pWal->mutex); taosMemoryFreeClear(pWal); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 1e9fcc70aa..e44e7038d0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -115,9 +115,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) { // if offset version is small than first version , let's seek to first version - TAOS_UNUSED(taosThreadMutexLock(&pWalReader->pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex)); int64_t firstVer = walGetFirstVer((pWalReader)->pWal); - TAOS_UNUSED(taosThreadMutexUnlock(&pWalReader->pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex)); if (pOffset->version < firstVer) { pOffset->version = firstVer; @@ -201,21 +201,21 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; - taosThreadMutexLock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); if (gloablPRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); - taosThreadMutexUnlock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo)); if (pRet == NULL) { wError("failed to allocate memory for localRet"); - taosThreadMutexUnlock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo)); - taosThreadMutexUnlock(&pWal->mutex); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet); diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 579921a7e0..bf24ed89fb 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -61,34 +61,34 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId); if (pRef->refVer != ver) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } pRef->refVer = ver; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); } TAOS_RETURN(TSDB_CODE_SUCCESS); } void walRefFirstVer(SWal *pWal, SWalRef *pRef) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); pRef->refVer = pWal->vers.firstVer; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer); } void walRefLastVer(SWal *pWal, SWalRef *pRef) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); pRef->refVer = pWal->vers.lastVer; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 52af3e8528..b3c5f30ba3 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -23,7 +23,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t code = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, restore from snapshot, version %" PRId64, pWal->cfg.vgId, ver); @@ -34,7 +34,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer != -1 && pRef->refVer <= ver) { taosHashCancelIterate(pWal->pRefHash, pIter); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_FAILED); } @@ -51,7 +51,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); if (taosRemoveFile(fnameStr) < 0) { wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -60,7 +60,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); if (taosRemoveFile(fnameStr) < 0) { wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -81,7 +81,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pWal->vers.snapshotVer = ver; pWal->vers.verInSnapshotting = -1; - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -160,12 +160,12 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) { } int32_t walRollback(SWal *pWal, int64_t ver) { - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); int64_t code; char fnameStr[WAL_FILE_LEN]; if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } @@ -175,7 +175,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // change current files code = walChangeWrite(pWal, ver); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -198,21 +198,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) { TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile)); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); if (pIdxFile == NULL) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } int64_t idxOff = walGetVerIdxOffset(pWal, ver); code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } // read idx file and get log file pos SWalIdxEntry entry; if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -223,14 +223,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) { wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { // TODO - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET); if (code < 0) { // TODO - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -238,19 +238,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) { SWalCkHead head; int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); if (size != sizeof(SWalCkHead)) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } code = walValidHeadCksum(&head); if (code != 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } if (head.head.version != ver) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } @@ -258,13 +258,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // truncate old files code = taosFtruncateFile(pLogFile, entry.offset); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } code = taosFtruncateFile(pIdxFile, idxOff); if (code < 0) { - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -278,13 +278,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) { code = walSaveMeta(pWal); if (code < 0) { wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } // unlock - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -375,7 +375,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { TAOS_RETURN(TSDB_CODE_FAILED); } - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); pWal->vers.verInSnapshotting = ver; pWal->vers.logRetention = logRetention; @@ -391,7 +391,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { } _exit: - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(code); } @@ -399,7 +399,7 @@ _exit: int32_t walEndSnapshot(SWal *pWal) { int32_t code = 0, lino = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); int64_t ver = pWal->vers.verInSnapshotting; wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64 @@ -508,7 +508,7 @@ int32_t walEndSnapshot(SWal *pWal) { taosArrayClear(pWal->toDeleteFiles); _exit: - taosThreadMutexUnlock(&pWal->mutex); + taosThreadRwlockWrlock(&pWal->mutex); if (code) { wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code)); @@ -719,7 +719,7 @@ int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn int32_t bodyLen) { int32_t code = 0, lino = 0; - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (index != pWal->vers.lastVer + 1) { TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit); @@ -738,7 +738,7 @@ _exit: wError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); return code; } @@ -749,7 +749,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) { return code; } - TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); if (taosFsyncFile(pWal->pLogFile) < 0) { @@ -758,7 +758,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) { code = TAOS_SYSTEM_ERROR(errno); } } - TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex)); + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); return code; }