From 2a71d47668208ec814269fad359bf95a3c0d41a9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 28 Jan 2023 16:11:17 +0800 Subject: [PATCH 1/2] fix: wal ref --- include/libs/wal/wal.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 +++- source/libs/wal/src/walRef.c | 29 +++++++++++++++++++++++++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a1ae1e429d..bef7301a07 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); +SWalRef *walRefFirstVer(SWal *); SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1d5fae33eb..7649e8a006 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -521,7 +521,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqOffsetResetToData(&fetchOffsetNew, 0, 0); } } else { - tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); + int64_t firstVer = walGetFirstVer(pTq->pVnode->pWal); + walRefVer(pHandle->pRef, firstVer); + tqOffsetResetToLog(&fetchOffsetNew, firstVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index e86111109c..f5cfe9abae 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -77,14 +77,39 @@ void walUnrefVer(SWalRef *pRef) { } #endif -SWalRef *walRefCommittedVer(SWal *pWal) { +SWalRef *walRefFirstVer(SWal *pWal) { SWalRef *pRef = walOpenRef(pWal); if (pRef == NULL) { return NULL; } taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetCommittedVer(pWal); + int64_t ver = walGetFirstVer(pWal); + + wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + return pRef; +} + +SWalRef *walRefCommittedVer(SWal *pWal) { + SWalRef *pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } + taosThreadMutexLock(&pWal->mutex); + + int64_t ver = walGetCommittedVer(pWal); + + wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); pRef->refVer = ver; // bsearch in fileSet From f5c4ca3380ba7361b8b2a89910896078303c6f8a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 28 Jan 2023 17:17:10 +0800 Subject: [PATCH 2/2] fix: wal ref --- include/libs/wal/wal.h | 2 +- source/dnode/vnode/src/tq/tq.c | 11 ++++++++--- source/libs/wal/src/walRef.c | 8 +++++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index bef7301a07..a0f421212a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -201,7 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); -SWalRef *walRefFirstVer(SWal *); +SWalRef *walRefFirstVer(SWal *, SWalRef *); SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7649e8a006..b195cfafb0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -521,9 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqOffsetResetToData(&fetchOffsetNew, 0, 0); } } else { - int64_t firstVer = walGetFirstVer(pTq->pVnode->pWal); - walRefVer(pHandle->pRef, firstVer); - tqOffsetResetToLog(&fetchOffsetNew, firstVer - 1); + pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); + if (pHandle->pRef == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -721,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey); + taosWLockLatch(&pTq->pushLock); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index f5cfe9abae..43470f4c82 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -77,10 +77,12 @@ void walUnrefVer(SWalRef *pRef) { } #endif -SWalRef *walRefFirstVer(SWal *pWal) { - SWalRef *pRef = walOpenRef(pWal); +SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { if (pRef == NULL) { - return NULL; + pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } } taosThreadMutexLock(&pWal->mutex);