From ca2e761a86b78e6e7595ce15966f20847903ce42 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 11 Sep 2024 10:24:04 +0800 Subject: [PATCH] Add mutex while read wal --- include/util/tutil.h | 22 +++++++++++++++------- source/libs/wal/src/walRead.c | 8 +++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index 6321d2011a..5fa06b7e61 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -73,14 +73,14 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar char buf[TSDB_PASSWORD_LEN + 1]; buf[TSDB_PASSWORD_LEN] = 0; - (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], - context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], - context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], - context.digest[12], context.digest[13], context.digest[14], context.digest[15]); + (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], + context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], + context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], + context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); (void)memcpy(target, buf, TSDB_PASSWORD_LEN); } -static FORCE_INLINE int32_t taosHashBinary(char* pBuf, int32_t len) { +static FORCE_INLINE int32_t taosHashBinary(char *pBuf, int32_t len) { uint64_t hashVal = MurmurHash3_64(pBuf, len); return sprintf(pBuf, "%" PRIu64, hashVal); } @@ -161,8 +161,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) -#define TAOS_GET_TERRNO(code) \ - (terrno == 0 ? code : terrno) +#define TAOS_GET_TERRNO(code) (terrno == 0 ? code : terrno) #define TAOS_RETURN(CODE) \ do { \ @@ -177,6 +176,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) +#define TAOS_CHECK_RETURN_WITH_MUTEX(CMD, MUTEX) \ + do { \ + int32_t __c = (CMD); \ + if (__c != TSDB_CODE_SUCCESS) { \ + taosThreadMutexUnlock(MUTEX); \ + TAOS_RETURN(__c); \ + } \ + } while (0) + #define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ do { \ code = (CMD); \ diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index deb5a07672..5568270efe 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -201,21 +201,23 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; + taosThreadMutexLock(&pWal->mutex); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); if (pRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); - + taosThreadMutexUnlock(&pWal->mutex); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer)); + TAOS_CHECK_RETURN_WITH_MUTEX(walReadChangeFile(pReader, pRet->firstVer), &pWal->mutex); } // error code was set inner - TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver)); + TAOS_CHECK_RETURN_WITH_MUTEX(walReadSeekFilePos(pReader, pRet->firstVer, ver), &pWal->mutex); + taosThreadMutexUnlock(&pWal->mutex); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver;