fix:subscribe time & return -1 if wal not exist
This commit is contained in:
parent
2f6c565392
commit
ce7db7bf90
|
@ -898,7 +898,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
|
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
|
||||||
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
|
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
|
||||||
pConsumer->epoch);
|
pConsumer->epoch);
|
||||||
pConsumer->subscribeTime = taosGetTimestampMs();
|
pConsumer->subscribeTime = pConsumer->createTime;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -371,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
pRead->pWal->vers.appliedVer);
|
pRead->pWal->vers.appliedVer);
|
||||||
|
|
||||||
// TODO: valid ver
|
// TODO: valid ver
|
||||||
// if (ver > pRead->pWal->vers.appliedVer) {
|
if (ver > pRead->pWal->vers.commitVer) {
|
||||||
// return -1;
|
return -1;
|
||||||
// }
|
}
|
||||||
|
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curVersion != ver) {
|
||||||
code = walReaderSeekVer(pRead, ver);
|
code = walReaderSeekVer(pRead, ver);
|
||||||
|
|
Loading…
Reference in New Issue