diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 0beb7c3b83..9992e18584 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -21,6 +21,7 @@ #include "tdef.h" #include "tlog.h" #include "tmsg.h" + #ifdef __cplusplus extern "C" { #endif @@ -123,8 +124,7 @@ typedef struct SWal { typedef struct { int64_t refId; int64_t refVer; - // int64_t refFile; - SWal *pWal; + SWal *pWal; } SWalRef; typedef struct { @@ -135,10 +135,8 @@ typedef struct { int8_t enableRef; } SWalFilterCond; -typedef struct SWalReader SWalReader; - // todo hide this struct -struct SWalReader { +typedef struct SWalReader { SWal *pWal; int64_t readerId; TdFilePtr pLogFile; @@ -151,7 +149,7 @@ struct SWalReader { TdThreadMutex mutex; SWalFilterCond cond; SWalCkHead *pHead; -}; +} SWalReader; // module initialization int32_t walInit(); @@ -175,7 +173,6 @@ int32_t walRollback(SWal *, int64_t ver); int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention); int32_t walEndSnapshot(SWal *); int32_t walRestoreFromSnapshot(SWal *, int64_t ver); -// for tq int32_t walApplyVer(SWal *, int64_t ver); // wal reader @@ -198,12 +195,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver); int32_t walFetchBody(SWalReader *pRead); int32_t walSkipFetchBody(SWalReader *pRead); -void walRefFirstVer(SWal *, SWalRef *); -void walRefLastVer(SWal *, SWalRef *); - SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); int32_t walSetRefVer(SWalRef *, int64_t ver); +void walRefFirstVer(SWal *, SWalRef *); +void walRefLastVer(SWal *, SWalRef *); // helper function for raft bool walLogExist(SWal *, int64_t ver); diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index b7169dec53..26447f60d9 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -24,25 +24,35 @@ SWalRef *walOpenRef(SWal *pWal) { if (pRef == NULL) { return NULL; } + pRef->refId = tGenIdPI64(); + + if (taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *))) { + taosMemoryFree(pRef); + return NULL; + } + pRef->refVer = -1; - // pRef->refFile = -1; pRef->pWal = pWal; - taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *)); + return pRef; } void walCloseRef(SWal *pWal, int64_t refId) { SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); - if (ppRef == NULL) return; - SWalRef *pRef = *ppRef; - if (pRef) { - wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); - } else { - wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId); + if (ppRef) { + SWalRef *pRef = *ppRef; + + if (pRef) { + wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); + + taosMemoryFree(pRef); + } else { + wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId); + } + + (void)taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); } - taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); - taosMemoryFree(pRef); } int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { @@ -52,31 +62,31 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { taosThreadMutexLock(&pWal->mutex); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { taosThreadMutexUnlock(&pWal->mutex); - terrno = TSDB_CODE_WAL_INVALID_VER; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } pRef->refVer = ver; taosThreadMutexUnlock(&pWal->mutex); } - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } void walRefFirstVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetFirstVer(pWal); - pRef->refVer = ver; + + pRef->refVer = pWal->vers.firstVer; taosThreadMutexUnlock(&pWal->mutex); - wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); + wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, pRef->refVer); } void walRefLastVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetLastVer(pWal); - pRef->refVer = ver; + + pRef->refVer = pWal->vers.lastVer; taosThreadMutexUnlock(&pWal->mutex); - wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); + wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, pRef->refVer); }