fix: wal ref
This commit is contained in:
parent
9615504024
commit
2a71d47668
|
@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
||||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||||
|
|
||||||
|
SWalRef *walRefFirstVer(SWal *);
|
||||||
SWalRef *walRefCommittedVer(SWal *);
|
SWalRef *walRefCommittedVer(SWal *);
|
||||||
|
|
||||||
SWalRef *walOpenRef(SWal *);
|
SWalRef *walOpenRef(SWal *);
|
||||||
|
|
|
@ -521,7 +521,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
|
int64_t firstVer = walGetFirstVer(pTq->pVnode->pWal);
|
||||||
|
walRefVer(pHandle->pRef, firstVer);
|
||||||
|
tqOffsetResetToLog(&fetchOffsetNew, firstVer - 1);
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
|
|
@ -77,14 +77,39 @@ void walUnrefVer(SWalRef *pRef) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SWalRef *walRefCommittedVer(SWal *pWal) {
|
SWalRef *walRefFirstVer(SWal *pWal) {
|
||||||
SWalRef *pRef = walOpenRef(pWal);
|
SWalRef *pRef = walOpenRef(pWal);
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
int64_t ver = walGetCommittedVer(pWal);
|
int64_t ver = walGetFirstVer(pWal);
|
||||||
|
|
||||||
|
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
|
pRef->refVer = ver;
|
||||||
|
// bsearch in fileSet
|
||||||
|
SWalFileInfo tmpInfo;
|
||||||
|
tmpInfo.firstVer = ver;
|
||||||
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
|
ASSERT(pRet != NULL);
|
||||||
|
pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return pRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWalRef *walRefCommittedVer(SWal *pWal) {
|
||||||
|
SWalRef *pRef = walOpenRef(pWal);
|
||||||
|
if (pRef == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
|
int64_t ver = walGetCommittedVer(pWal);
|
||||||
|
|
||||||
|
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
// bsearch in fileSet
|
||||||
|
|
Loading…
Reference in New Issue