fix(tmq): avoid seek to previous position.
This commit is contained in:
parent
9feb0a86d6
commit
d585f34ea9
|
@ -193,9 +193,10 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
|
||||||
void walCloseReader(SWalReader *pRead);
|
void walCloseReader(SWalReader *pRead);
|
||||||
void walReadReset(SWalReader *pReader);
|
void walReadReset(SWalReader *pReader);
|
||||||
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
||||||
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
|
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
|
||||||
int32_t walNextValidMsg(SWalReader *pRead);
|
int32_t walNextValidMsg(SWalReader *pRead);
|
||||||
int64_t walReaderGetCurrentVer(const SWalReader* pReader);
|
int64_t walReaderGetCurrentVer(const SWalReader* pReader);
|
||||||
|
int64_t walReaderGetValidFirstVer(const SWalReader* pReader);
|
||||||
|
|
||||||
// only for tq usage
|
// only for tq usage
|
||||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||||
|
|
|
@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
|
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
|
||||||
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id);
|
tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id);
|
||||||
|
|
|
@ -107,26 +107,40 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
*pScanIdle = false;
|
*pScanIdle = false;
|
||||||
|
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
if (pTask->chkInfo.currentVer < firstVer) {
|
||||||
SWal *pWal = pTask->exec.pWalReader->pWal;
|
pTask->chkInfo.currentVer = firstVer;
|
||||||
if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) {
|
tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
|
||||||
pTask->chkInfo.currentVer = pWal->vers.firstVer;
|
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
|
||||||
code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
// todo need retry if failed
|
||||||
|
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// append the data for the stream
|
||||||
|
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
|
} else {
|
||||||
|
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
|
if (currentVer != -1) {
|
||||||
|
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// append the data for the stream
|
||||||
|
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
}
|
}
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
|
|
||||||
SPackedData packData = {0};
|
SPackedData packData = {0};
|
||||||
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
|
int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -62,9 +62,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
void walCloseReader(SWalReader *pReader) {
|
void walCloseReader(SWalReader *pReader) {
|
||||||
taosCloseFile(&pReader->pIdxFile);
|
taosCloseFile(&pReader->pIdxFile);
|
||||||
taosCloseFile(&pReader->pLogFile);
|
taosCloseFile(&pReader->pLogFile);
|
||||||
/*if (pReader->cond.enableRef) {*/
|
|
||||||
/*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
|
|
||||||
/*}*/
|
|
||||||
taosMemoryFreeClear(pReader->pHead);
|
taosMemoryFreeClear(pReader->pHead);
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
}
|
}
|
||||||
|
@ -74,20 +71,22 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t lastVer = walGetLastVer(pReader->pWal);
|
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
|
|
||||||
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||||
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||||
// taosMsleep(10);
|
|
||||||
}
|
}
|
||||||
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
|
||||||
int64_t endVer = TMIN(appliedVer, committedVer);
|
int64_t endVer = TMIN(appliedVer, committedVer);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||||
", applied index:%" PRId64", end index:%" PRId64,
|
", applied index:%" PRId64", end index:%" PRId64,
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
||||||
|
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= endVer) {
|
||||||
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
||||||
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
|
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
|
||||||
if (walFetchBodyNew(pReader) < 0) {
|
if (walFetchBodyNew(pReader) < 0) {
|
||||||
|
@ -98,13 +97,16 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
if (walSkipFetchBodyNew(pReader) < 0) {
|
if (walSkipFetchBodyNew(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchVer = pReader->curVersion;
|
fetchVer = pReader->curVersion;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
|
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
|
||||||
|
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
|
||||||
|
|
||||||
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
|
@ -206,7 +208,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
|
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
|
||||||
SWal *pWal = pReader->pWal;
|
SWal *pWal = pReader->pWal;
|
||||||
if (ver == pReader->curVersion) {
|
if (ver == pReader->curVersion) {
|
||||||
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
|
||||||
|
@ -236,7 +238,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
|
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
|
||||||
|
|
||||||
if (pRead->curVersion != fetchVer) {
|
if (pRead->curVersion != fetchVer) {
|
||||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
if (walReaderSeekVer(pRead, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
|
@ -276,6 +278,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->pHead = ptr;
|
pReader->pHead = ptr;
|
||||||
pReadHead = &pReader->pHead->head;
|
pReadHead = &pReader->pHead->head;
|
||||||
pReader->capacity = pReadHead->bodyLen;
|
pReader->capacity = pReadHead->bodyLen;
|
||||||
|
@ -291,14 +294,11 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
|
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walValidBodyCksum(pReader->pHead) != 0) {
|
if (walValidBodyCksum(pReader->pHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -340,7 +340,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curVersion != ver) {
|
||||||
code = walReadSeekVer(pRead, ver);
|
code = walReaderSeekVer(pRead, ver);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
// pRead->curVersion = ver;
|
// pRead->curVersion = ver;
|
||||||
// pRead->curInvalid = 1;
|
// pRead->curInvalid = 1;
|
||||||
|
@ -475,7 +475,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
taosThreadMutexLock(&pReader->mutex);
|
taosThreadMutexLock(&pReader->mutex);
|
||||||
|
|
||||||
if (pReader->curVersion != ver) {
|
if (pReader->curVersion != ver) {
|
||||||
if (walReadSeekVer(pReader, ver) < 0) {
|
if (walReaderSeekVer(pReader, ver) < 0) {
|
||||||
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
|
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
|
||||||
taosThreadMutexUnlock(&pReader->mutex);
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue