wal/ref: use new return macros for ref

This commit is contained in:
Minglei Jin 2024-07-18 14:14:51 +08:00
parent 964397ea63
commit 180411434a
2 changed files with 35 additions and 29 deletions

View File

@ -21,6 +21,7 @@
#include "tdef.h" #include "tdef.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h" #include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -123,8 +124,7 @@ typedef struct SWal {
typedef struct { typedef struct {
int64_t refId; int64_t refId;
int64_t refVer; int64_t refVer;
// int64_t refFile; SWal *pWal;
SWal *pWal;
} SWalRef; } SWalRef;
typedef struct { typedef struct {
@ -135,10 +135,8 @@ typedef struct {
int8_t enableRef; int8_t enableRef;
} SWalFilterCond; } SWalFilterCond;
typedef struct SWalReader SWalReader;
// todo hide this struct // todo hide this struct
struct SWalReader { typedef struct SWalReader {
SWal *pWal; SWal *pWal;
int64_t readerId; int64_t readerId;
TdFilePtr pLogFile; TdFilePtr pLogFile;
@ -151,7 +149,7 @@ struct SWalReader {
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;
SWalCkHead *pHead; SWalCkHead *pHead;
}; } SWalReader;
// module initialization // module initialization
int32_t walInit(); int32_t walInit();
@ -175,7 +173,6 @@ int32_t walRollback(SWal *, int64_t ver);
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention); int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
int32_t walEndSnapshot(SWal *); int32_t walEndSnapshot(SWal *);
int32_t walRestoreFromSnapshot(SWal *, int64_t ver); int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// for tq
int32_t walApplyVer(SWal *, int64_t ver); int32_t walApplyVer(SWal *, int64_t ver);
// wal reader // wal reader
@ -198,12 +195,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver);
int32_t walFetchBody(SWalReader *pRead); int32_t walFetchBody(SWalReader *pRead);
int32_t walSkipFetchBody(SWalReader *pRead); int32_t walSkipFetchBody(SWalReader *pRead);
void walRefFirstVer(SWal *, SWalRef *);
void walRefLastVer(SWal *, SWalRef *);
SWalRef *walOpenRef(SWal *); SWalRef *walOpenRef(SWal *);
void walCloseRef(SWal *pWal, int64_t refId); void walCloseRef(SWal *pWal, int64_t refId);
int32_t walSetRefVer(SWalRef *, int64_t ver); int32_t walSetRefVer(SWalRef *, int64_t ver);
void walRefFirstVer(SWal *, SWalRef *);
void walRefLastVer(SWal *, SWalRef *);
// helper function for raft // helper function for raft
bool walLogExist(SWal *, int64_t ver); bool walLogExist(SWal *, int64_t ver);

View File

@ -24,25 +24,35 @@ SWalRef *walOpenRef(SWal *pWal) {
if (pRef == NULL) { if (pRef == NULL) {
return NULL; return NULL;
} }
pRef->refId = tGenIdPI64(); pRef->refId = tGenIdPI64();
if (taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *))) {
taosMemoryFree(pRef);
return NULL;
}
pRef->refVer = -1; pRef->refVer = -1;
// pRef->refFile = -1;
pRef->pWal = pWal; pRef->pWal = pWal;
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
return pRef; return pRef;
} }
void walCloseRef(SWal *pWal, int64_t refId) { void walCloseRef(SWal *pWal, int64_t refId) {
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
if (ppRef == NULL) return; if (ppRef) {
SWalRef *pRef = *ppRef; SWalRef *pRef = *ppRef;
if (pRef) {
wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); if (pRef) {
} else { wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
taosMemoryFree(pRef);
} else {
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
}
(void)taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
} }
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
taosMemoryFree(pRef);
} }
int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
@ -52,31 +62,31 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
} }
pRef->refVer = ver; pRef->refVer = ver;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
} }
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
void walRefFirstVer(SWal *pWal, SWalRef *pRef) { void walRefFirstVer(SWal *pWal, SWalRef *pRef) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal);
pRef->refVer = ver; pRef->refVer = pWal->vers.firstVer;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer);
} }
void walRefLastVer(SWal *pWal, SWalRef *pRef) { void walRefLastVer(SWal *pWal, SWalRef *pRef) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetLastVer(pWal);
pRef->refVer = ver; pRef->refVer = pWal->vers.lastVer;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer);
} }