Merge pull request #27889 from taosdata/fix/3.0/TD-31990

Fix/3.0/td 31990
This commit is contained in:
Hongze Cheng 2024-09-14 18:13:00 +08:00 committed by GitHub
commit cd827087ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 98 additions and 73 deletions

View File

@ -112,8 +112,8 @@ typedef struct SWal {
int64_t totSize;
int64_t lastRollSeq;
// ctl
int64_t refId;
TdThreadMutex mutex;
int64_t refId;
TdThreadRwlock mutex;
// ref
SHashObj *pRefHash; // refId -> SWalRef
// path

View File

@ -73,14 +73,14 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
char buf[TSDB_PASSWORD_LEN + 1];
buf[TSDB_PASSWORD_LEN] = 0;
(void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1],
context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
(void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5],
context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10],
context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
(void)memcpy(target, buf, TSDB_PASSWORD_LEN);
}
static FORCE_INLINE int32_t taosHashBinary(char* pBuf, int32_t len) {
static FORCE_INLINE int32_t taosHashBinary(char *pBuf, int32_t len) {
uint64_t hashVal = MurmurHash3_64(pBuf, len);
return sprintf(pBuf, "%" PRIu64, hashVal);
}
@ -161,8 +161,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
#define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member)))
#define TAOS_GET_TERRNO(code) \
(terrno == 0 ? code : terrno)
#define TAOS_GET_TERRNO(code) (terrno == 0 ? code : terrno)
#define TAOS_RETURN(CODE) \
do { \
@ -177,6 +176,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define TAOS_CHECK_RETURN_WITH_FREE(CMD, PTR) \
do { \
int32_t __c = (CMD); \
if (__c != TSDB_CODE_SUCCESS) { \
taosMemoryFree(PTR); \
TAOS_RETURN(__c); \
} \
} while (0)
#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \
do { \
code = (CMD); \

View File

@ -3294,11 +3294,11 @@ _out:;
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
if(pEntry != NULL)
if (pEntry != NULL)
sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
(void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex);
@ -3449,8 +3449,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs;
sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId,
DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
raftStoreSetTerm(ths, pMsg->term);

View File

@ -684,7 +684,7 @@ _err:
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
int64_t ver = -1;
int64_t totSize = 0;
(void)taosThreadMutexLock(&pWal->mutex);
(void)taosThreadRwlockRdlock(&pWal->mutex);
int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
while (--fileIdx) {
SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
@ -694,7 +694,7 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
}
totSize += pInfo->fileSize;
}
(void)taosThreadMutexUnlock(&pWal->mutex);
(void)taosThreadRwlockUnlock(&pWal->mutex);
return ver + 1;
}

View File

@ -21,11 +21,11 @@
#include "walInt.h"
typedef struct {
int8_t stop;
int8_t inited;
uint32_t seq;
int32_t refSetId;
TdThread thread;
int8_t stop;
int8_t inited;
uint32_t seq;
int32_t refSetId;
TdThread thread;
stopDnodeFn stopDnode;
} SWalMgmt;
@ -81,6 +81,15 @@ void walCleanUp() {
}
}
static int32_t walInitLock(SWal *pWal) {
TdThreadRwlockAttr attr;
(void)taosThreadRwlockAttrInit(&attr);
(void)taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
(void)taosThreadRwlockInit(&pWal->mutex, &attr);
(void)taosThreadRwlockAttrDestroy(&attr);
return 0;
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
if (pWal == NULL) {
@ -88,7 +97,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
if (walInitLock(pWal) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pWal);
return NULL;
@ -179,7 +188,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
_err:
taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
TAOS_UNUSED(taosThreadMutexDestroy(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockDestroy(&pWal->mutex));
taosMemoryFreeClear(pWal);
return NULL;
@ -215,15 +224,15 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
int32_t walPersist(SWal *pWal) {
int32_t code = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
code = walSaveMeta(pWal);
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
void walClose(SWal *pWal) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
(void)walSaveMeta(pWal);
TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
pWal->pLogFile = NULL;
@ -243,7 +252,7 @@ void walClose(SWal *pWal) {
}
taosHashCleanup(pWal->pRefHash);
pWal->pRefHash = NULL;
(void)taosThreadMutexUnlock(&pWal->mutex);
(void)taosThreadRwlockUnlock(&pWal->mutex);
if (pWal->cfg.level == TAOS_WAL_SKIP) {
wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path);
@ -258,7 +267,7 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
(void)taosThreadMutexDestroy(&pWal->mutex);
(void)taosThreadRwlockDestroy(&pWal->mutex);
taosMemoryFreeClear(pWal);
}

View File

@ -115,9 +115,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) {
// if offset version is small than first version , let's seek to first version
TAOS_UNUSED(taosThreadMutexLock(&pWalReader->pWal->mutex));
TAOS_UNUSED(taosThreadRwlockRdlock(&pWalReader->pWal->mutex));
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
TAOS_UNUSED(taosThreadMutexUnlock(&pWalReader->pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWalReader->pWal->mutex));
if (pOffset->version < firstVer) {
pOffset->version = firstVer;
@ -201,21 +201,29 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (pRet == NULL) {
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (gloablPRet == NULL) {
wError("failed to find WAL log file with ver:%" PRId64, ver);
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo));
if (pRet == NULL) {
wError("failed to allocate memory for localRet");
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner
TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer));
TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet);
}
// error code was set inner
TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver));
TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet);
taosMemoryFree(pRet);
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
pReader->curVersion = ver;

View File

@ -61,34 +61,34 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal;
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
if (pRef->refVer != ver) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
pRef->refVer = ver;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void walRefFirstVer(SWal *pWal, SWalRef *pRef) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
pRef->refVer = pWal->vers.firstVer;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer);
}
void walRefLastVer(SWal *pWal, SWalRef *pRef) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
pRef->refVer = pWal->vers.lastVer;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer);
}

View File

@ -23,7 +23,7 @@
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
int32_t code = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, restore from snapshot, version %" PRId64, pWal->cfg.vgId, ver);
@ -34,7 +34,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer != -1 && pRef->refVer <= ver) {
taosHashCancelIterate(pWal->pRefHash, pIter);
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_FAILED);
}
@ -51,7 +51,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
wError("vgId:%d, restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
@ -60,7 +60,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
wError("vgId:%d, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
@ -81,7 +81,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.snapshotVer = ver;
pWal->vers.verInSnapshotting = -1;
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -160,12 +160,12 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) {
}
int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int64_t code;
char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
@ -175,7 +175,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// change current files
code = walChangeWrite(pWal, ver);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
@ -198,21 +198,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile));
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxFile == NULL) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
// read idx file and get log file pos
SWalIdxEntry entry;
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
@ -223,14 +223,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
// TODO
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (code < 0) {
// TODO
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
@ -238,19 +238,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
SWalCkHead head;
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
code = walValidHeadCksum(&head);
if (code != 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
}
if (head.head.version != ver) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
}
@ -258,13 +258,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// truncate old files
code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) {
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
@ -278,13 +278,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
code = walSaveMeta(pWal);
if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
// unlock
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -375,7 +375,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
pWal->vers.verInSnapshotting = ver;
pWal->vers.logRetention = logRetention;
@ -391,7 +391,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
}
_exit:
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
@ -399,7 +399,7 @@ _exit:
int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0, lino = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
int64_t ver = pWal->vers.verInSnapshotting;
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
@ -508,7 +508,7 @@ int32_t walEndSnapshot(SWal *pWal) {
taosArrayClear(pWal->toDeleteFiles);
_exit:
taosThreadMutexUnlock(&pWal->mutex);
taosThreadRwlockUnlock(&pWal->mutex);
if (code) {
wError("vgId:%d, %s failed at line %d since %s", pWal->cfg.vgId, __func__, lino, tstrerror(code));
@ -717,7 +717,7 @@ int32_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn
int32_t bodyLen) {
int32_t code = 0, lino = 0;
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
if (index != pWal->vers.lastVer + 1) {
TAOS_CHECK_GOTO(TSDB_CODE_WAL_INVALID_VER, &lino, _exit);
@ -736,7 +736,7 @@ _exit:
wError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
return code;
}
@ -747,7 +747,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) {
return code;
}
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pLogFile) < 0) {
@ -756,7 +756,7 @@ int32_t walFsync(SWal *pWal, bool forceFsync) {
code = terrno;
}
}
TAOS_UNUSED(taosThreadMutexUnlock(&pWal->mutex));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
return code;
}