fix: wal ref

This commit is contained in:
Liu Jicong 2023-01-28 17:17:10 +08:00
parent 2a71d47668
commit f5c4ca3380
3 changed files with 14 additions and 7 deletions

View File

@ -201,7 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
SWalRef *walRefFirstVer(SWal *);
SWalRef *walRefFirstVer(SWal *, SWalRef *);
SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *);

View File

@ -521,9 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
}
} else {
int64_t firstVer = walGetFirstVer(pTq->pVnode->pWal);
walRefVer(pHandle->pRef, firstVer);
tqOffsetResetToLog(&fetchOffsetNew, firstVer - 1);
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->pRef == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
@ -721,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {

View File

@ -77,10 +77,12 @@ void walUnrefVer(SWalRef *pRef) {
}
#endif
SWalRef *walRefFirstVer(SWal *pWal) {
SWalRef *pRef = walOpenRef(pWal);
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
if (pRef == NULL) {
return NULL;
pRef = walOpenRef(pWal);
if (pRef == NULL) {
return NULL;
}
}
taosThreadMutexLock(&pWal->mutex);