feat(wal): log applied ver
This commit is contained in:
parent
f93d465da7
commit
a3133b9f6d
|
@ -64,6 +64,7 @@ typedef struct {
|
||||||
int64_t verInSnapshotting;
|
int64_t verInSnapshotting;
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
int64_t commitVer;
|
int64_t commitVer;
|
||||||
|
int64_t appliedVer;
|
||||||
int64_t lastVer;
|
int64_t lastVer;
|
||||||
} SWalVer;
|
} SWalVer;
|
||||||
|
|
||||||
|
@ -172,6 +173,9 @@ int32_t walRollback(SWal *, int64_t ver);
|
||||||
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
||||||
int32_t walEndSnapshot(SWal *);
|
int32_t walEndSnapshot(SWal *);
|
||||||
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
|
// for tq
|
||||||
|
int32_t walApplyVer(SWal *, int64_t ver);
|
||||||
|
|
||||||
// int32_t walDataCorrupted(SWal*);
|
// int32_t walDataCorrupted(SWal*);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
|
@ -186,7 +190,6 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
||||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
|
@ -206,6 +209,7 @@ int64_t walGetFirstVer(SWal *);
|
||||||
int64_t walGetSnapshotVer(SWal *);
|
int64_t walGetSnapshotVer(SWal *);
|
||||||
int64_t walGetLastVer(SWal *);
|
int64_t walGetLastVer(SWal *);
|
||||||
int64_t walGetCommittedVer(SWal *);
|
int64_t walGetCommittedVer(SWal *);
|
||||||
|
int64_t walGetAppliedVer(SWal *);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -265,6 +265,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
int64_t consumerId = be64toh(pReq->consumerId);
|
int64_t consumerId = be64toh(pReq->consumerId);
|
||||||
|
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
|
if (pConsumer == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
|
|
||||||
|
|
|
@ -237,6 +237,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
|
walApplyVer(pTq->pVnode->pWal, ver);
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
||||||
|
|
||||||
|
@ -253,4 +255,3 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -878,6 +878,8 @@ _exit:
|
||||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vDebug("successful submit in vg %d version %ld", pVnode->config.vgId, version);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,8 @@ int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
|
||||||
|
|
||||||
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
|
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
|
||||||
|
|
||||||
|
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
|
||||||
|
|
||||||
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||||
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,9 +66,15 @@ void walCloseReader(SWalReader *pRead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walNextValidMsg(SWalReader *pRead) {
|
int32_t walNextValidMsg(SWalReader *pRead) {
|
||||||
wDebug("vgId:%d wal start to fetch", pRead->pWal->cfg.vgId);
|
|
||||||
int64_t fetchVer = pRead->curVersion;
|
int64_t fetchVer = pRead->curVersion;
|
||||||
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
|
int64_t lastVer = walGetLastVer(pRead->pWal);
|
||||||
|
int64_t committedVer = walGetCommittedVer(pRead->pWal);
|
||||||
|
int64_t appliedVer = walGetAppliedVer(pRead->pWal);
|
||||||
|
int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer;
|
||||||
|
endVer = TMIN(appliedVer, endVer);
|
||||||
|
|
||||||
|
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
|
||||||
|
pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= endVer) {
|
||||||
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -64,6 +64,12 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t walApplyVer(SWal *pWal, int64_t ver) {
|
||||||
|
// TODO: error check
|
||||||
|
pWal->vers.appliedVer = ver;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
|
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
|
||||||
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
|
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
|
||||||
|
|
|
@ -225,7 +225,7 @@ class TDTestCase:
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
self.tmqCase1()
|
self.tmqCase1()
|
||||||
# self.tmqCase2() TD-17267
|
# self.tmqCase2() # TD-17267
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Loading…
Reference in New Issue