fix(stream): disable scan wal when halt is set.
This commit is contained in:
parent
aa8909b267
commit
9a30573b8b
|
@ -1090,14 +1090,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we can stop the stream task execution
|
// now we can stop the stream task execution
|
||||||
streamTaskHalt(pStreamTask);
|
|
||||||
|
|
||||||
|
int64_t latestVer = 0;
|
||||||
|
taosThreadMutexLock(&pStreamTask->lock);
|
||||||
|
streamTaskHalt(pStreamTask);
|
||||||
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
||||||
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
||||||
|
latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||||
|
taosThreadMutexUnlock(&pStreamTask->lock);
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
pRange = &pTask->dataRange.range;
|
pRange = &pTask->dataRange.range;
|
||||||
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
|
||||||
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
|
|
|
@ -308,9 +308,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
|
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead) {
|
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||||
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||||
", applied ver:%" PRId64,
|
", applied ver:%" PRId64", 0x%"PRIx64,
|
||||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
|
||||||
|
|
||||||
int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
@ -329,7 +329,7 @@ int32_t walFetchBody(SWalReader *pRead) {
|
||||||
int64_t id = pRead->readerId;
|
int64_t id = pRead->readerId;
|
||||||
|
|
||||||
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||||
", applied ver:%" PRId64 ", %" PRIx64,
|
", applied ver:%" PRId64 ", 0x%" PRIx64,
|
||||||
vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
||||||
pRead->pWal->vers.appliedVer, id);
|
pRead->pWal->vers.appliedVer, id);
|
||||||
|
|
||||||
|
@ -358,14 +358,14 @@ int32_t walFetchBody(SWalReader *pRead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReadHead->version != ver) {
|
if (pReadHead->version != ver) {
|
||||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", %"PRIx64, vgId,
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", 0x%"PRIx64, vgId,
|
||||||
pReadHead->version, ver, id);
|
pReadHead->version, ver, id);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, %" PRIx64, vgId, ver, id);
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue