fix(taosx): set version
This commit is contained in:
parent
60bb2c18dc
commit
f0a66602da
|
@ -219,17 +219,20 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
SWalCont* pHead = &((*ppCkHead)->head);
|
||||
if (IS_META_MSG(pHead->msgType)) {
|
||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (isValValidForTable(pHandle, pHead)) {
|
||||
*fetchOffset = offset;
|
||||
code = 0;
|
||||
goto END;
|
||||
} else {
|
||||
offset++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -198,7 +198,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||
wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId,
|
||||
pReader->curVersion, pReader->curInvalid, ver);
|
||||
|
||||
pReader->curVersion = ver;
|
||||
|
@ -350,9 +350,10 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
int64_t contLen;
|
||||
bool seeked = false;
|
||||
|
||||
wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64,
|
||||
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||
pRead->pWal->vers.lastVer);
|
||||
wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
||||
pRead->pWal->vers.appliedVer);
|
||||
|
||||
// TODO: valid ver
|
||||
if (ver > pRead->pWal->vers.appliedVer) {
|
||||
|
@ -404,6 +405,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
||||
int64_t code;
|
||||
|
||||
wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
||||
|
||||
ASSERT(pRead->curVersion == pHead->head.version);
|
||||
ASSERT(pRead->curInvalid == 0);
|
||||
|
||||
|
@ -423,6 +429,11 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
SWalCont *pReadHead = &((*ppHead)->head);
|
||||
int64_t ver = pReadHead->version;
|
||||
|
||||
wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
||||
pRead->pWal->vers.appliedVer);
|
||||
|
||||
if (pRead->capacity < pReadHead->bodyLen) {
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
if (ptr == NULL) {
|
||||
|
|
Loading…
Reference in New Issue