change mutex lock in SWal to rwlock

This commit is contained in:
xiao-77 2024-09-11 19:48:50 +08:00
parent 96c59cc746
commit b49d9f3691
7 changed files with 63 additions and 63 deletions

View File

@ -112,8 +112,8 @@ typedef struct SWal {
int64_t totSize;
int64_t lastRollSeq;
// ctl
int64_t refId;
TdThreadMutex mutex;
int64_t refId;
TdThreadRwlock mutex;
// ref
SHashObj *pRefHash; // refId -> SWalRef
// path

View File

@ -3294,11 +3294,11 @@ _out:;
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
if(pEntry != NULL)
if (pEntry != NULL)
sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
(void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
@ -3449,8 +3449,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs;
sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId,
DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
raftStoreSetTerm(ths, pMsg->term);

View File

@ -684,7 +684,7 @@ _err:
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
int64_t ver = -1;
int64_t totSize = 0;
(void)taosThreadMutexLock(&pWal->mutex);
(void)taosThreadRwlockRdlock(&pWal->mutex);
int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
while (--fileIdx) {
SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
@ -694,7 +694,7 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
}
totSize += pInfo->fileSize;
}
(void)taosThreadMutexUnlock(&pWal->mutex);
(void)taosThreadRwlockUnlock(&pWal->mutex);
return ver + 1;
}

View File

@ -21,11 +21,11 @@
#include "walInt.h"
typedef struct {
int8_t stop;
int8_t inited;
uint32_t seq;
int32_t refSetId;
TdThread thread;
int8_t stop;
int8_t inited;
uint32_t seq;
int32_t refSetId;
TdThread thread;
stopDnodeFn stopDnode;
} SWalMgmt;
@ -88,7 +88,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
if (taosThreadRwlockInit(&pWal->mutex, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pWal);
return NULL;
@ -179,7 +179,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
_err:
taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
TAOS_UNUSED(taosThreadMutexDestroy(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockDestroy(&pWal->mutex));
taosMemoryFreeClear(pWal);
return NULL;
@ -215,15 +215,15 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
int32_t walPersist(SWal *pWal) {
int32_t code = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
code = walSaveMeta(pWal);
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
void walClose(SWal *pWal) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
(void)walSaveMeta(pWal);
TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
pWal->pLogFile = NULL;
@ -243,7 +243,7 @@ void walClose(SWal *pWal) {
}
taosHashCleanup(pWal->pRefHash);
pWal->pRefHash = NULL;
(void)taosThreadMutexUnlock(&pWal->mutex);
(void)taosThreadRwlockUnlock(&pWal->mutex);
if (pWal->cfg.level == TAOS_WAL_SKIP) {
wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path);
@ -258,7 +258,7 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
(void)taosThreadMutexDestroy(&pWal->mutex);
(void)taosThreadRwlockDestroy(&pWal->mutex);
taosMemoryFreeClear(pWal);
}

View File

@ -115,9 +115,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {
// if offset version is small than first version , let's seek to first version
TAOS_UNUSED(taosThreadMutexLock(&pWalReader->pWal->mutex));
TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex));
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
TAOS_UNUSED(taosThreadMutexUnlock(&pWalReader->pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex));
if (pOffset->version < firstVer) {
pOffset->version = firstVer;
@ -201,21 +201,21 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
taosThreadMutexLock(&pWal->mutex);
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (gloablPRet == NULL) {
wError("failed to find WAL log file with ver:%" PRId64, ver);
taosThreadMutexUnlock(&pWal->mutex);
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo));
if (pRet == NULL) {
wError("failed to allocate memory for localRet");
taosThreadMutexUnlock(&pWal->mutex);
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo));
taosThreadMutexUnlock(&pWal->mutex);
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner
TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet);

View File

@ -61,34 +61,34 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal;
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
if (pRef->refVer != ver) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
pRef->refVer = ver;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void walRefFirstVer(SWal *pWal, SWalRef *pRef) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
pRef->refVer = pWal->vers.firstVer;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer);
}
void walRefLastVer(SWal *pWal, SWalRef *pRef) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
pRef->refVer = pWal->vers.lastVer;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer);
}

View File

@ -23,7 +23,7 @@
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
int32_t code = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, restore from snapshot, version %" PRId64, pWal->cfg.vgId, ver);
@ -34,7 +34,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer != -1 && pRef->refVer <= ver) {
taosHashCancelIterate(pWal->pRefHash, pIter);
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_FAILED);
}
@ -51,7 +51,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
@ -60,7 +60,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
@ -81,7 +81,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.snapshotVer = ver;
pWal->vers.verInSnapshotting = -1;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -160,12 +160,12 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) {
}
int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int64_t code;
char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
@ -175,7 +175,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// change current files
code = walChangeWrite(pWal, ver);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
@ -198,21 +198,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile));
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxFile == NULL) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
// read idx file and get log file pos
SWalIdxEntry entry;
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
@ -223,14 +223,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
// TODO
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (code < 0) {
// TODO
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
@ -238,19 +238,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
SWalCkHead head;
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
code = walValidHeadCksum(&head);
if (code != 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
}
if (head.head.version != ver) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
}
@ -258,13 +258,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// truncate old files
code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
}
@ -278,13 +278,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
code = walSaveMeta(pWal);
if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
// unlock
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -375,7 +375,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
pWal->vers.verInSnapshotting = ver;
pWal->vers.logRetention = logRetention;
@ -391,7 +391,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
}
_exit:
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
@ -399,7 +399,7 @@ _exit:
int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0, lino = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
int64_t ver = pWal->vers.verInSnapshotting;
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
@ -508,7 +508,7 @@ int32_t walEndSnapshot(SWal *pWal) {
taosArrayClear(pWal->toDeleteFiles);
_exit:
taosThreadMutexUnlock(&pWal->mutex);
taosThreadRwlockWrlock(&pWal->mutex);
if (code) {
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
@ -719,7 +719,7 @@ int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn
int32_t bodyLen) {
int32_t code = 0, lino = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
if (index != pWal->vers.lastVer + 1) {
TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit);
@ -738,7 +738,7 @@ _exit:
wError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
return code;
}
@ -749,7 +749,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) {
return code;
}
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pLogFile) < 0) {
@ -758,7 +758,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) {
code = TAOS_SYSTEM_ERROR(errno);
}
}
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
return code;
}