Add mutex while read wal

This commit is contained in:
xiao-77 2024-09-11 10:24:04 +08:00
parent be80848d7e
commit ca2e761a86
2 changed files with 20 additions and 10 deletions

View File

@ -73,14 +73,14 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
char buf[TSDB_PASSWORD_LEN + 1]; char buf[TSDB_PASSWORD_LEN + 1];
buf[TSDB_PASSWORD_LEN] = 0; buf[TSDB_PASSWORD_LEN] = 0;
(void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], (void)sprintf(buf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5],
context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10],
context.digest[12], context.digest[13], context.digest[14], context.digest[15]); context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
(void)memcpy(target, buf, TSDB_PASSWORD_LEN); (void)memcpy(target, buf, TSDB_PASSWORD_LEN);
} }
static FORCE_INLINE int32_t taosHashBinary(char* pBuf, int32_t len) { static FORCE_INLINE int32_t taosHashBinary(char *pBuf, int32_t len) {
uint64_t hashVal = MurmurHash3_64(pBuf, len); uint64_t hashVal = MurmurHash3_64(pBuf, len);
return sprintf(pBuf, "%" PRIu64, hashVal); return sprintf(pBuf, "%" PRIu64, hashVal);
} }
@ -161,8 +161,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
#define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member)))
#define TAOS_GET_TERRNO(code) \ #define TAOS_GET_TERRNO(code) (terrno == 0 ? code : terrno)
(terrno == 0 ? code : terrno)
#define TAOS_RETURN(CODE) \ #define TAOS_RETURN(CODE) \
do { \ do { \
@ -177,6 +176,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \ } \
} while (0) } while (0)
#define TAOS_CHECK_RETURN_WITH_MUTEX(CMD, MUTEX) \
do { \
int32_t __c = (CMD); \
if (__c != TSDB_CODE_SUCCESS) { \
taosThreadMutexUnlock(MUTEX); \
TAOS_RETURN(__c); \
} \
} while (0)
#define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ #define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \
do { \ do { \
code = (CMD); \ code = (CMD); \

View File

@ -201,21 +201,23 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
// bsearch in fileSet // bsearch in fileSet
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
taosThreadMutexLock(&pWal->mutex);
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (pRet == NULL) { if (pRet == NULL) {
wError("failed to find WAL log file with ver:%" PRId64, ver); wError("failed to find WAL log file with ver:%" PRId64, ver);
taosThreadMutexUnlock(&pWal->mutex);
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
} }
if (pReader->curFileFirstVer != pRet->firstVer) { if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner // error code was set inner
TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer)); TAOS_CHECK_RETURN_WITH_MUTEX(walReadChangeFile(pReader, pRet->firstVer), &pWal->mutex);
} }
// error code was set inner // error code was set inner
TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver)); TAOS_CHECK_RETURN_WITH_MUTEX(walReadSeekFilePos(pReader, pRet->firstVer, ver), &pWal->mutex);
taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
pReader->curVersion = ver; pReader->curVersion = ver;